From f4b0fc4e2ec84343c8fb6aba7a186bab267ee8ce Mon Sep 17 00:00:00 2001 From: Laura Abro Date: Tue, 29 Apr 2025 11:09:29 -0300 Subject: [PATCH] use background task to avoid timeout --- worker/orca-agent/Dockerfile | 8 +- worker/orca-agent/requirements.txt | 3 +- worker/orca-agent/setup.py | 8 -- worker/orca-agent/src/server/routes/task.py | 67 +++++++++++-- .../server/services/repo_summary_service.py | 12 +-- worker/src/orcaSettings.ts | 2 +- worker/src/task/1-task.ts | 82 ++++------------ worker/src/task/5-routes.ts | 94 ++++++++++++++++++- worker/src/utils/constant.ts | 29 +++--- 9 files changed, 198 insertions(+), 107 deletions(-) delete mode 100644 worker/orca-agent/setup.py diff --git a/worker/orca-agent/Dockerfile b/worker/orca-agent/Dockerfile index c3772b9..1734ac2 100644 --- a/worker/orca-agent/Dockerfile +++ b/worker/orca-agent/Dockerfile @@ -20,7 +20,7 @@ RUN git config --global --add safe.directory /app # Copy the rest of your application code into the container COPY . . -ENV MIDDLE_SERVER_URL=https://builder247.api.koii.network +ENV MIDDLE_SERVER_URL=https://ik8kcow8ksw8gwgoo0ggosko.dev.koii.network # Configure logging and output ENV PYTHONUNBUFFERED=1 @@ -40,9 +40,9 @@ CMD ["gunicorn", \ "--capture-output", \ "--enable-stdio-inheritance", \ "--logger-class=gunicorn.glogging.Logger", \ - "--timeout", "600", \ - "--graceful-timeout", "600", \ - "--keep-alive", "5", \ + "--timeout", "1800", \ + "--graceful-timeout", "1800", \ + "--keep-alive", "15", \ "-w", "1", \ "-b", "0.0.0.0:8080", \ "main:app"] diff --git a/worker/orca-agent/requirements.txt b/worker/orca-agent/requirements.txt index dd40c02..a74b126 100644 --- a/worker/orca-agent/requirements.txt +++ b/worker/orca-agent/requirements.txt @@ -15,6 +15,5 @@ base58>=2.1.0 tenacity>=9.0.0 sqlmodel>=0.0.22 openai>=0.28.0 -colorama>=0.4.6 -prometheus-swarm>=0.1.5 +colorama>=0.4.67prometheus-swarm>=0.1.7 prometheus-test>=0.1.2 diff --git a/worker/orca-agent/setup.py b/worker/orca-agent/setup.py deleted file mode 100644 index 1e17926..0000000 --- a/worker/orca-agent/setup.py +++ /dev/null @@ -1,8 +0,0 @@ -from setuptools import setup, find_packages - -setup( - name="task-flow", - version="0.1", - packages=find_packages(include=["src", "src.*"]), - python_requires=">=3.6", -) diff --git a/worker/orca-agent/src/server/routes/task.py b/worker/orca-agent/src/server/routes/task.py index b42295d..417d662 100644 --- a/worker/orca-agent/src/server/routes/task.py +++ b/worker/orca-agent/src/server/routes/task.py @@ -1,7 +1,41 @@ +import os +import requests from flask import Blueprint, jsonify, request from src.server.services import repo_summary_service +from concurrent.futures import ThreadPoolExecutor +from prometheus_swarm.database import get_db +from src.server.services.repo_summary_service import logger bp = Blueprint("task", __name__) +executor = ThreadPoolExecutor(max_workers=2) + + +def post_pr_url(agent_result, task_id, signature, round_number): + try: + result = agent_result.result() # Get the result from the future + logger.info(f"Result: {result}") + result_data = result.get("result", {}) + logger.info(f"Result data: {result_data}") + # Make a POST request with the result + response = requests.post( + f"http://host.docker.internal:30017/task/{task_id}/add-todo-pr", + json={ + "prUrl": result_data.get("data", {}).get("pr_url"), + "signature": signature, + "roundNumber": round_number, + "success": result.get("success", False), + "message": result_data.get("error", ""), + }, + ) + response.raise_for_status() # Raise an error for bad responses + except Exception as e: + # Handle exceptions (e.g., log the error) + logger.error(f"Failed to send result: {e}") + logger.error(f"Exception type: {type(e)}") + if hasattr(e, "__traceback__"): + import traceback + + logger.error(f"Traceback: {''.join(traceback.format_tb(e.__traceback__))}") @bp.post("/worker-task/") @@ -10,18 +44,37 @@ def start_task(round_number): logger.info(f"Task started for round: {round_number}") data = request.get_json() + task_id = data["task_id"] + podcall_signature = data["podcall_signature"] + repo_url = data["repo_url"] logger.info(f"Task data: {data}") - required_fields = ["taskId", "round_number", "repo_url"] + required_fields = ["task_id", "round_number", "repo_url", "podcall_signature"] if any(data.get(field) is None for field in required_fields): return jsonify({"error": "Missing data"}), 401 - result = repo_summary_service.handle_task_creation( - task_id=data["taskId"], - round_number=int(round_number), - repo_url=data["repo_url"], - ) + # Get db instance in the main thread where we have app context + db = get_db() - return result + if os.getenv("TEST_MODE") == "true": + result = repo_summary_service.handle_task_creation( + task_id=task_id, + round_number=int(round_number), + repo_url=repo_url, + db=db, # Pass db instance + ) + return jsonify(result) + else: + agent_result = executor.submit( + repo_summary_service.handle_task_creation, + task_id=task_id, + round_number=round_number, + repo_url=repo_url, + db=db, # Pass db instance + ) + agent_result.add_done_callback( + lambda future: post_pr_url(future, task_id, podcall_signature, round_number) + ) + return jsonify({"status": "Task is being processed"}), 200 if __name__ == "__main__": diff --git a/worker/orca-agent/src/server/services/repo_summary_service.py b/worker/orca-agent/src/server/services/repo_summary_service.py index 2914663..4b12c9a 100644 --- a/worker/orca-agent/src/server/services/repo_summary_service.py +++ b/worker/orca-agent/src/server/services/repo_summary_service.py @@ -1,6 +1,5 @@ """Task service module.""" -from flask import jsonify from prometheus_swarm.database import get_db from prometheus_swarm.clients import setup_client from src.workflows.repoSummarizer.workflow import RepoSummarizerWorkflow @@ -12,10 +11,11 @@ from src.database.models import Submission load_dotenv() -def handle_task_creation(task_id, round_number, repo_url): +def handle_task_creation(task_id, round_number, repo_url, db=None): """Handle task creation request.""" try: - db = get_db() + if db is None: + db = get_db() # Fallback for direct calls client = setup_client("anthropic") workflow = RepoSummarizerWorkflow( @@ -35,11 +35,9 @@ def handle_task_creation(task_id, round_number, repo_url): ) db.add(submission) db.commit() - return jsonify({"success": True, "result": result}) + return {"success": True, "result": result} else: - return jsonify( - {"success": False, "result": result.get("error", "No result")} - ) + return {"success": False, "result": result.get("error", "No result")} except Exception as e: logger.error(f"Repo summarizer failed: {str(e)}") raise diff --git a/worker/src/orcaSettings.ts b/worker/src/orcaSettings.ts index ca05629..24405c9 100644 --- a/worker/src/orcaSettings.ts +++ b/worker/src/orcaSettings.ts @@ -2,7 +2,7 @@ import { TASK_ID, namespaceWrapper } from "@_koii/namespace-wrapper"; import "dotenv/config"; import os from "os"; -const imageUrl = "docker.io/labrocadabro/prometheus-summarizer:0.2"; +const imageUrl = "docker.io/labrocadabro/prometheus-summarizer:0.4"; function getHostIP() { const interfaces = os.networkInterfaces(); diff --git a/worker/src/task/1-task.ts b/worker/src/task/1-task.ts index 8547dc8..742664d 100644 --- a/worker/src/task/1-task.ts +++ b/worker/src/task/1-task.ts @@ -8,6 +8,7 @@ import { checkGitHub } from "../utils/githubCheck"; import { LogLevel } from "@_koii/namespace-wrapper/dist/types"; import { actionMessage } from "../utils/constant"; import { errorMessage } from "../utils/constant"; +import { v4 as uuidv4 } from "uuid"; dotenv.config(); export async function task(roundNumber: number): Promise { @@ -128,11 +129,22 @@ export async function task(roundNumber: number): Promise { } const requiredWorkResponseData = await requiredWorkResponse.json(); console.log("[TASK] requiredWorkResponseData: ", requiredWorkResponseData); + const uuid = uuidv4(); + await namespaceWrapper.storeSet(`uuid-${roundNumber}`, uuid); + + const podcallPayload = { + taskId: TASK_ID, + roundNumber, + uuid, + }; + + const podCallSignature = await namespaceWrapper.payloadSigning(podcallPayload, stakingKeypair.secretKey); const jsonBody = { - taskId: TASK_ID, - round_number: String(roundNumber), + task_id: TASK_ID, + round_number: roundNumber, repo_url: `https://github.com/${requiredWorkResponseData.data.repo_owner}/${requiredWorkResponseData.data.repo_name}`, + podcall_signature: podCallSignature, }; console.log("[TASK] jsonBody: ", jsonBody); try { @@ -144,70 +156,12 @@ export async function task(roundNumber: number): Promise { body: JSON.stringify(jsonBody), }); console.log("[TASK] repoSummaryResponse: ", repoSummaryResponse); - console.log("[TASK] repoSummaryResponse.data.result.data ", repoSummaryResponse.data.result.data); - const payload = { - taskId: TASK_ID, - action: "add-todo-pr", - roundNumber: roundNumber, - prUrl: repoSummaryResponse.data.result.data.pr_url, - stakingKey: stakingKey, - }; - console.log("[TASK] Signing payload: ", payload); - if (repoSummaryResponse.status === 200) { - try { - const signature = await namespaceWrapper.payloadSigning(payload, stakingKeypair.secretKey); - console.log("[TASK] signature: ", signature); - const addPrToSummarizerTodoResponse = await fetch(`${middleServerUrl}/summarizer/worker/add-todo-pr`, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ signature: signature, stakingKey: stakingKey }), - }); - console.log("[TASK] addPrToSummarizerTodoResponse: ", addPrToSummarizerTodoResponse); - } catch (error) { - await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO); - console.error("[TASK] Error adding PR to summarizer todo:", error); - } - await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUCCESSFULLY_SUMMARIZED); - } else { - // post this summary response to slack` to notify the team - // THE HOOK IS ALREADY DISABLED - // try{ - // const slackResponse = await fetch('https://hooks.slack.com/services/', { - // method: "POST", - // headers: { - // "Content-Type": "application/json", - // }, - // body: JSON.stringify({ - // text: `[TASK] Error summarizing issue:\nStatus: ${repoSummaryResponse.status}\nData: ${JSON.stringify(repoSummaryResponse.data, null, 2)}` - // }), - // }); - // console.log("[TASK] slackResponse: ", slackResponse); - // }catch(error){ - // console.error("[TASK] Error posting to slack:", error); - // } - - await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_FAILED_TO_BE_SUMMARIZED); + if (repoSummaryResponse.status !== 200) { + await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUMMARIZATION_FAILED); } } catch (error) { - await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_FAILED_TO_BE_SUMMARIZED); - - // try{ - // const slackResponse = await fetch('https://hooks.slack.com/services', { - // method: "POST", - // headers: { - // "Content-Type": "application/json", - // }, - // body: JSON.stringify({ - // text: `[TASK] Error summarizing issue:\n ${JSON.stringify(error)}` - // }), - // }); - // console.log("[TASK] slackResponse: ", slackResponse); - // }catch(error){ - // console.error("[TASK] Error posting to slack:", error); - // } - console.error("[TASK] EXECUTE TASK ERROR:", JSON.stringify(error)); + await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUMMARIZATION_FAILED); + console.error("[TASK] EXECUTE TASK ERROR:", error); } } catch (error) { await namespaceWrapper.storeSet(`result-${roundNumber}`, status.UNKNOWN_ERROR); diff --git a/worker/src/task/5-routes.ts b/worker/src/task/5-routes.ts index e408841..2437767 100644 --- a/worker/src/task/5-routes.ts +++ b/worker/src/task/5-routes.ts @@ -1,9 +1,10 @@ -import { namespaceWrapper, app } from "@_koii/task-manager/namespace-wrapper"; +import { namespaceWrapper, app, TASK_ID } from "@_koii/task-manager/namespace-wrapper"; import { getLeaderNode } from "../utils/leader"; import { task } from "./1-task"; import { submission } from "./2-submission"; import { audit } from "./3-audit"; import { taskRunner } from "@_koii/task-manager"; +import { middleServerUrl, status } from "../utils/constant"; /** * @@ -53,4 +54,95 @@ export async function routes() { const submitDistributionResult = await taskRunner.submitDistributionList(Number(roundNumber)); res.status(200).json({ result: submitDistributionResult }); }); + + app.post("/add-todo-pr", async (req, res) => { + const signature = req.body.signature; + const prUrl = req.body.prUrl; + const roundNumber = Number(req.body.roundNumber); + const success = req.body.success; + const message = req.body.message; + console.log("[TASK] req.body", req.body); + try { + if (success) { + await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUMMARIZATION_SUCCEEDED); + } else { + await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUMMARIZATION_FAILED); + console.error("[TASK] Error summarizing repository:", message); + return; + } + const uuid = await namespaceWrapper.storeGet(`uuid-${roundNumber}`); + console.log("[TASK] uuid: ", uuid); + if (!uuid) { + throw new Error("No uuid found"); + } + + const currentRound = await namespaceWrapper.getRound(); + + if (roundNumber !== currentRound) { + throw new Error(`Invalid round number: ${roundNumber}. Current round: ${currentRound}.`); + } + + const publicKey = await namespaceWrapper.getMainAccountPubkey(); + const stakingKeypair = await namespaceWrapper.getSubmitterAccount(); + if (!stakingKeypair) { + throw new Error("No staking key found"); + } + const stakingKey = stakingKeypair.publicKey.toBase58(); + const secretKey = stakingKeypair.secretKey; + + if (!publicKey) { + throw new Error("No public key found"); + } + + const payload = await namespaceWrapper.verifySignature(signature, stakingKey); + if (!payload) { + throw new Error("Invalid signature"); + } + console.log("[TASK] payload: ", payload); + const data = payload.data; + if (!data) { + throw new Error("No signature data found"); + } + const jsonData = JSON.parse(data); + if (jsonData.taskId !== TASK_ID) { + throw new Error(`Invalid task ID from signature: ${jsonData.taskId}. Actual task ID: ${TASK_ID}`); + } + if (jsonData.roundNumber !== currentRound) { + throw new Error( + `Invalid round number from signature: ${jsonData.roundNumber}. Current round: ${currentRound}.`, + ); + } + if (jsonData.uuid !== uuid) { + throw new Error(`Invalid uuid from signature: ${jsonData.uuid}. Actual uuid: ${uuid}`); + } + const middleServerPayload = { + taskId: jsonData.taskId, + roundNumber, + prUrl, + stakingKey, + publicKey, + action: "add-todo-pr", + }; + const middleServerSignature = await namespaceWrapper.payloadSigning(middleServerPayload, secretKey); + const middleServerResponse = await fetch(`${middleServerUrl}/summarizer/worker/add-todo-pr`, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ signature: middleServerSignature, stakingKey: stakingKey }), + }); + + console.log("[TASK] Add PR Response: ", middleServerResponse); + + if (middleServerResponse.status !== 200) { + throw new Error(`Posting to middle server failed: ${middleServerResponse.statusText}`); + } + await namespaceWrapper.storeSet(`result-${roundNumber}`, status.SAVING_TODO_PR_SUCCEEDED); + res.status(200).json({ result: "Successfully saved PR" }); + } catch (error) { + console.error("[TASK] Error adding PR to summarizer todo:", error); + await namespaceWrapper.storeSet(`result-${roundNumber}`, status.SAVING_TODO_PR_FAILED); + res.status(400).json({ error: "Failed to save PR" }); + } + }); } diff --git a/worker/src/utils/constant.ts b/worker/src/utils/constant.ts index 7d08d5b..7698a76 100644 --- a/worker/src/utils/constant.ts +++ b/worker/src/utils/constant.ts @@ -1,22 +1,22 @@ import dotenv from "dotenv"; dotenv.config(); - export const status = { - ISSUE_FAILED_TO_BE_SUMMARIZED: "Issue failed to be summarized", - ISSUE_SUCCESSFULLY_SUMMARIZED: "Issue successfully summarized", + ISSUE_SUMMARIZATION_FAILED: "Issue summarization failed", + ISSUE_SUMMARIZATION_SUCCEEDED: "Issue successfully summarized", NO_ISSUES_PENDING_TO_BE_SUMMARIZED: "No issues pending to be summarized", ROUND_LESS_THAN_OR_EQUAL_TO_1: "Round <= 1", NO_ORCA_CLIENT: "No orca client", - NO_CHOSEN_AS_ISSUE_SUMMARIZER: "No chosen as issue summarizer", + NOT_SELECTED_AS_SUMMARIZER: "Not selected as summarizer", UNKNOWN_ERROR: "Unknown error", STAR_ISSUE_FAILED: "Star issue failed", GITHUB_CHECK_FAILED: "GitHub check failed", ANTHROPIC_API_KEY_INVALID: "Anthropic API key invalid", ANTHROPIC_API_KEY_NO_CREDIT: "Anthropic API key has no credit", NO_DATA_FOR_THIS_ROUND: "No data for this round", - ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO: "Issue failed to add PR to summarizer todo", -} + SAVING_TODO_PR_FAILED: "Summarizer todo PR not saved", + SAVING_TODO_PR_SUCCEEDED: "Summarizer todo PR saved", +}; export const errorMessage = { ISSUE_FAILED_TO_BE_SUMMARIZED: "We couldn't summarize this issue. Please try again later.", @@ -32,7 +32,7 @@ export const errorMessage = { ANTHROPIC_API_KEY_NO_CREDIT: "Your Anthropic API key has no remaining credits.", NO_DATA_FOR_THIS_ROUND: "There is no data available for this round.", ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO: "We couldn't add the PR to the summarizer todo list.", -} +}; export const actionMessage = { ISSUE_FAILED_TO_BE_SUMMARIZED: "We couldn't summarize this issue. Please try again later.", @@ -44,14 +44,17 @@ export const actionMessage = { UNKNOWN_ERROR: "An unexpected error occurred. Please try again later.", STAR_ISSUE_FAILED: "We couldn't star the issue. Please try again later.", GITHUB_CHECK_FAILED: "Please go to the env variable page to update your GitHub Key.", - ANTHROPIC_API_KEY_INVALID: "Please follow the guide under task description page to set up your Anthropic API key correctly.", + ANTHROPIC_API_KEY_INVALID: + "Please follow the guide under task description page to set up your Anthropic API key correctly.", ANTHROPIC_API_KEY_NO_CREDIT: "Please add credits to continue.", NO_DATA_FOR_THIS_ROUND: "There is no data available for this round.", - ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO: "We couldn't add the PR to the summarizer todo list. Please try again later.", -} + ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO: + "We couldn't add the PR to the summarizer todo list. Please try again later.", +}; /*********************THE CONSTANTS THAT PROD/TEST ARE DIFFERENT *********************/ -export const defaultBountyMarkdownFile = "https://raw.githubusercontent.com/koii-network/prometheus-swarm-bounties/master/README.md" +export const defaultBountyMarkdownFile = + "https://raw.githubusercontent.com/koii-network/prometheus-swarm-bounties/master/README.md"; -export const customReward = 400*10**9 // This should be in ROE! +export const customReward = 400 * 10 ** 9; // This should be in ROE! -export const middleServerUrl = "https://ooww84kco0s0cs808w8cg804.dev.koii.network" \ No newline at end of file +export const middleServerUrl = "https://ik8kcow8ksw8gwgoo0ggosko.dev.koii.network";