use background task to avoid timeout
This commit is contained in:
@ -20,7 +20,7 @@ RUN git config --global --add safe.directory /app
|
||||
# Copy the rest of your application code into the container
|
||||
COPY . .
|
||||
|
||||
ENV MIDDLE_SERVER_URL=https://builder247.api.koii.network
|
||||
ENV MIDDLE_SERVER_URL=https://ik8kcow8ksw8gwgoo0ggosko.dev.koii.network
|
||||
|
||||
# Configure logging and output
|
||||
ENV PYTHONUNBUFFERED=1
|
||||
@ -40,9 +40,9 @@ CMD ["gunicorn", \
|
||||
"--capture-output", \
|
||||
"--enable-stdio-inheritance", \
|
||||
"--logger-class=gunicorn.glogging.Logger", \
|
||||
"--timeout", "600", \
|
||||
"--graceful-timeout", "600", \
|
||||
"--keep-alive", "5", \
|
||||
"--timeout", "1800", \
|
||||
"--graceful-timeout", "1800", \
|
||||
"--keep-alive", "15", \
|
||||
"-w", "1", \
|
||||
"-b", "0.0.0.0:8080", \
|
||||
"main:app"]
|
||||
|
@ -15,6 +15,5 @@ base58>=2.1.0
|
||||
tenacity>=9.0.0
|
||||
sqlmodel>=0.0.22
|
||||
openai>=0.28.0
|
||||
colorama>=0.4.6
|
||||
prometheus-swarm>=0.1.5
|
||||
colorama>=0.4.67prometheus-swarm>=0.1.7
|
||||
prometheus-test>=0.1.2
|
||||
|
@ -1,8 +0,0 @@
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name="task-flow",
|
||||
version="0.1",
|
||||
packages=find_packages(include=["src", "src.*"]),
|
||||
python_requires=">=3.6",
|
||||
)
|
@ -1,7 +1,41 @@
|
||||
import os
|
||||
import requests
|
||||
from flask import Blueprint, jsonify, request
|
||||
from src.server.services import repo_summary_service
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from prometheus_swarm.database import get_db
|
||||
from src.server.services.repo_summary_service import logger
|
||||
|
||||
bp = Blueprint("task", __name__)
|
||||
executor = ThreadPoolExecutor(max_workers=2)
|
||||
|
||||
|
||||
def post_pr_url(agent_result, task_id, signature, round_number):
|
||||
try:
|
||||
result = agent_result.result() # Get the result from the future
|
||||
logger.info(f"Result: {result}")
|
||||
result_data = result.get("result", {})
|
||||
logger.info(f"Result data: {result_data}")
|
||||
# Make a POST request with the result
|
||||
response = requests.post(
|
||||
f"http://host.docker.internal:30017/task/{task_id}/add-todo-pr",
|
||||
json={
|
||||
"prUrl": result_data.get("data", {}).get("pr_url"),
|
||||
"signature": signature,
|
||||
"roundNumber": round_number,
|
||||
"success": result.get("success", False),
|
||||
"message": result_data.get("error", ""),
|
||||
},
|
||||
)
|
||||
response.raise_for_status() # Raise an error for bad responses
|
||||
except Exception as e:
|
||||
# Handle exceptions (e.g., log the error)
|
||||
logger.error(f"Failed to send result: {e}")
|
||||
logger.error(f"Exception type: {type(e)}")
|
||||
if hasattr(e, "__traceback__"):
|
||||
import traceback
|
||||
|
||||
logger.error(f"Traceback: {''.join(traceback.format_tb(e.__traceback__))}")
|
||||
|
||||
|
||||
@bp.post("/worker-task/<round_number>")
|
||||
@ -10,18 +44,37 @@ def start_task(round_number):
|
||||
logger.info(f"Task started for round: {round_number}")
|
||||
|
||||
data = request.get_json()
|
||||
task_id = data["task_id"]
|
||||
podcall_signature = data["podcall_signature"]
|
||||
repo_url = data["repo_url"]
|
||||
logger.info(f"Task data: {data}")
|
||||
required_fields = ["taskId", "round_number", "repo_url"]
|
||||
required_fields = ["task_id", "round_number", "repo_url", "podcall_signature"]
|
||||
if any(data.get(field) is None for field in required_fields):
|
||||
return jsonify({"error": "Missing data"}), 401
|
||||
|
||||
result = repo_summary_service.handle_task_creation(
|
||||
task_id=data["taskId"],
|
||||
round_number=int(round_number),
|
||||
repo_url=data["repo_url"],
|
||||
)
|
||||
# Get db instance in the main thread where we have app context
|
||||
db = get_db()
|
||||
|
||||
return result
|
||||
if os.getenv("TEST_MODE") == "true":
|
||||
result = repo_summary_service.handle_task_creation(
|
||||
task_id=task_id,
|
||||
round_number=int(round_number),
|
||||
repo_url=repo_url,
|
||||
db=db, # Pass db instance
|
||||
)
|
||||
return jsonify(result)
|
||||
else:
|
||||
agent_result = executor.submit(
|
||||
repo_summary_service.handle_task_creation,
|
||||
task_id=task_id,
|
||||
round_number=round_number,
|
||||
repo_url=repo_url,
|
||||
db=db, # Pass db instance
|
||||
)
|
||||
agent_result.add_done_callback(
|
||||
lambda future: post_pr_url(future, task_id, podcall_signature, round_number)
|
||||
)
|
||||
return jsonify({"status": "Task is being processed"}), 200
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -1,6 +1,5 @@
|
||||
"""Task service module."""
|
||||
|
||||
from flask import jsonify
|
||||
from prometheus_swarm.database import get_db
|
||||
from prometheus_swarm.clients import setup_client
|
||||
from src.workflows.repoSummarizer.workflow import RepoSummarizerWorkflow
|
||||
@ -12,10 +11,11 @@ from src.database.models import Submission
|
||||
load_dotenv()
|
||||
|
||||
|
||||
def handle_task_creation(task_id, round_number, repo_url):
|
||||
def handle_task_creation(task_id, round_number, repo_url, db=None):
|
||||
"""Handle task creation request."""
|
||||
try:
|
||||
db = get_db()
|
||||
if db is None:
|
||||
db = get_db() # Fallback for direct calls
|
||||
client = setup_client("anthropic")
|
||||
|
||||
workflow = RepoSummarizerWorkflow(
|
||||
@ -35,11 +35,9 @@ def handle_task_creation(task_id, round_number, repo_url):
|
||||
)
|
||||
db.add(submission)
|
||||
db.commit()
|
||||
return jsonify({"success": True, "result": result})
|
||||
return {"success": True, "result": result}
|
||||
else:
|
||||
return jsonify(
|
||||
{"success": False, "result": result.get("error", "No result")}
|
||||
)
|
||||
return {"success": False, "result": result.get("error", "No result")}
|
||||
except Exception as e:
|
||||
logger.error(f"Repo summarizer failed: {str(e)}")
|
||||
raise
|
||||
|
@ -2,7 +2,7 @@ import { TASK_ID, namespaceWrapper } from "@_koii/namespace-wrapper";
|
||||
import "dotenv/config";
|
||||
import os from "os";
|
||||
|
||||
const imageUrl = "docker.io/labrocadabro/prometheus-summarizer:0.2";
|
||||
const imageUrl = "docker.io/labrocadabro/prometheus-summarizer:0.4";
|
||||
|
||||
function getHostIP() {
|
||||
const interfaces = os.networkInterfaces();
|
||||
|
@ -8,6 +8,7 @@ import { checkGitHub } from "../utils/githubCheck";
|
||||
import { LogLevel } from "@_koii/namespace-wrapper/dist/types";
|
||||
import { actionMessage } from "../utils/constant";
|
||||
import { errorMessage } from "../utils/constant";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
dotenv.config();
|
||||
|
||||
export async function task(roundNumber: number): Promise<void> {
|
||||
@ -128,11 +129,22 @@ export async function task(roundNumber: number): Promise<void> {
|
||||
}
|
||||
const requiredWorkResponseData = await requiredWorkResponse.json();
|
||||
console.log("[TASK] requiredWorkResponseData: ", requiredWorkResponseData);
|
||||
const uuid = uuidv4();
|
||||
await namespaceWrapper.storeSet(`uuid-${roundNumber}`, uuid);
|
||||
|
||||
const podcallPayload = {
|
||||
taskId: TASK_ID,
|
||||
roundNumber,
|
||||
uuid,
|
||||
};
|
||||
|
||||
const podCallSignature = await namespaceWrapper.payloadSigning(podcallPayload, stakingKeypair.secretKey);
|
||||
|
||||
const jsonBody = {
|
||||
taskId: TASK_ID,
|
||||
round_number: String(roundNumber),
|
||||
task_id: TASK_ID,
|
||||
round_number: roundNumber,
|
||||
repo_url: `https://github.com/${requiredWorkResponseData.data.repo_owner}/${requiredWorkResponseData.data.repo_name}`,
|
||||
podcall_signature: podCallSignature,
|
||||
};
|
||||
console.log("[TASK] jsonBody: ", jsonBody);
|
||||
try {
|
||||
@ -144,70 +156,12 @@ export async function task(roundNumber: number): Promise<void> {
|
||||
body: JSON.stringify(jsonBody),
|
||||
});
|
||||
console.log("[TASK] repoSummaryResponse: ", repoSummaryResponse);
|
||||
console.log("[TASK] repoSummaryResponse.data.result.data ", repoSummaryResponse.data.result.data);
|
||||
const payload = {
|
||||
taskId: TASK_ID,
|
||||
action: "add-todo-pr",
|
||||
roundNumber: roundNumber,
|
||||
prUrl: repoSummaryResponse.data.result.data.pr_url,
|
||||
stakingKey: stakingKey,
|
||||
};
|
||||
console.log("[TASK] Signing payload: ", payload);
|
||||
if (repoSummaryResponse.status === 200) {
|
||||
try {
|
||||
const signature = await namespaceWrapper.payloadSigning(payload, stakingKeypair.secretKey);
|
||||
console.log("[TASK] signature: ", signature);
|
||||
const addPrToSummarizerTodoResponse = await fetch(`${middleServerUrl}/summarizer/worker/add-todo-pr`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({ signature: signature, stakingKey: stakingKey }),
|
||||
});
|
||||
console.log("[TASK] addPrToSummarizerTodoResponse: ", addPrToSummarizerTodoResponse);
|
||||
} catch (error) {
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO);
|
||||
console.error("[TASK] Error adding PR to summarizer todo:", error);
|
||||
}
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUCCESSFULLY_SUMMARIZED);
|
||||
} else {
|
||||
// post this summary response to slack` to notify the team
|
||||
// THE HOOK IS ALREADY DISABLED
|
||||
// try{
|
||||
// const slackResponse = await fetch('https://hooks.slack.com/services/', {
|
||||
// method: "POST",
|
||||
// headers: {
|
||||
// "Content-Type": "application/json",
|
||||
// },
|
||||
// body: JSON.stringify({
|
||||
// text: `[TASK] Error summarizing issue:\nStatus: ${repoSummaryResponse.status}\nData: ${JSON.stringify(repoSummaryResponse.data, null, 2)}`
|
||||
// }),
|
||||
// });
|
||||
// console.log("[TASK] slackResponse: ", slackResponse);
|
||||
// }catch(error){
|
||||
// console.error("[TASK] Error posting to slack:", error);
|
||||
// }
|
||||
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_FAILED_TO_BE_SUMMARIZED);
|
||||
if (repoSummaryResponse.status !== 200) {
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUMMARIZATION_FAILED);
|
||||
}
|
||||
} catch (error) {
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_FAILED_TO_BE_SUMMARIZED);
|
||||
|
||||
// try{
|
||||
// const slackResponse = await fetch('https://hooks.slack.com/services', {
|
||||
// method: "POST",
|
||||
// headers: {
|
||||
// "Content-Type": "application/json",
|
||||
// },
|
||||
// body: JSON.stringify({
|
||||
// text: `[TASK] Error summarizing issue:\n ${JSON.stringify(error)}`
|
||||
// }),
|
||||
// });
|
||||
// console.log("[TASK] slackResponse: ", slackResponse);
|
||||
// }catch(error){
|
||||
// console.error("[TASK] Error posting to slack:", error);
|
||||
// }
|
||||
console.error("[TASK] EXECUTE TASK ERROR:", JSON.stringify(error));
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUMMARIZATION_FAILED);
|
||||
console.error("[TASK] EXECUTE TASK ERROR:", error);
|
||||
}
|
||||
} catch (error) {
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.UNKNOWN_ERROR);
|
||||
|
@ -1,9 +1,10 @@
|
||||
import { namespaceWrapper, app } from "@_koii/task-manager/namespace-wrapper";
|
||||
import { namespaceWrapper, app, TASK_ID } from "@_koii/task-manager/namespace-wrapper";
|
||||
import { getLeaderNode } from "../utils/leader";
|
||||
import { task } from "./1-task";
|
||||
import { submission } from "./2-submission";
|
||||
import { audit } from "./3-audit";
|
||||
import { taskRunner } from "@_koii/task-manager";
|
||||
import { middleServerUrl, status } from "../utils/constant";
|
||||
|
||||
/**
|
||||
*
|
||||
@ -53,4 +54,95 @@ export async function routes() {
|
||||
const submitDistributionResult = await taskRunner.submitDistributionList(Number(roundNumber));
|
||||
res.status(200).json({ result: submitDistributionResult });
|
||||
});
|
||||
|
||||
app.post("/add-todo-pr", async (req, res) => {
|
||||
const signature = req.body.signature;
|
||||
const prUrl = req.body.prUrl;
|
||||
const roundNumber = Number(req.body.roundNumber);
|
||||
const success = req.body.success;
|
||||
const message = req.body.message;
|
||||
console.log("[TASK] req.body", req.body);
|
||||
try {
|
||||
if (success) {
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUMMARIZATION_SUCCEEDED);
|
||||
} else {
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.ISSUE_SUMMARIZATION_FAILED);
|
||||
console.error("[TASK] Error summarizing repository:", message);
|
||||
return;
|
||||
}
|
||||
const uuid = await namespaceWrapper.storeGet(`uuid-${roundNumber}`);
|
||||
console.log("[TASK] uuid: ", uuid);
|
||||
if (!uuid) {
|
||||
throw new Error("No uuid found");
|
||||
}
|
||||
|
||||
const currentRound = await namespaceWrapper.getRound();
|
||||
|
||||
if (roundNumber !== currentRound) {
|
||||
throw new Error(`Invalid round number: ${roundNumber}. Current round: ${currentRound}.`);
|
||||
}
|
||||
|
||||
const publicKey = await namespaceWrapper.getMainAccountPubkey();
|
||||
const stakingKeypair = await namespaceWrapper.getSubmitterAccount();
|
||||
if (!stakingKeypair) {
|
||||
throw new Error("No staking key found");
|
||||
}
|
||||
const stakingKey = stakingKeypair.publicKey.toBase58();
|
||||
const secretKey = stakingKeypair.secretKey;
|
||||
|
||||
if (!publicKey) {
|
||||
throw new Error("No public key found");
|
||||
}
|
||||
|
||||
const payload = await namespaceWrapper.verifySignature(signature, stakingKey);
|
||||
if (!payload) {
|
||||
throw new Error("Invalid signature");
|
||||
}
|
||||
console.log("[TASK] payload: ", payload);
|
||||
const data = payload.data;
|
||||
if (!data) {
|
||||
throw new Error("No signature data found");
|
||||
}
|
||||
const jsonData = JSON.parse(data);
|
||||
if (jsonData.taskId !== TASK_ID) {
|
||||
throw new Error(`Invalid task ID from signature: ${jsonData.taskId}. Actual task ID: ${TASK_ID}`);
|
||||
}
|
||||
if (jsonData.roundNumber !== currentRound) {
|
||||
throw new Error(
|
||||
`Invalid round number from signature: ${jsonData.roundNumber}. Current round: ${currentRound}.`,
|
||||
);
|
||||
}
|
||||
if (jsonData.uuid !== uuid) {
|
||||
throw new Error(`Invalid uuid from signature: ${jsonData.uuid}. Actual uuid: ${uuid}`);
|
||||
}
|
||||
const middleServerPayload = {
|
||||
taskId: jsonData.taskId,
|
||||
roundNumber,
|
||||
prUrl,
|
||||
stakingKey,
|
||||
publicKey,
|
||||
action: "add-todo-pr",
|
||||
};
|
||||
const middleServerSignature = await namespaceWrapper.payloadSigning(middleServerPayload, secretKey);
|
||||
const middleServerResponse = await fetch(`${middleServerUrl}/summarizer/worker/add-todo-pr`, {
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({ signature: middleServerSignature, stakingKey: stakingKey }),
|
||||
});
|
||||
|
||||
console.log("[TASK] Add PR Response: ", middleServerResponse);
|
||||
|
||||
if (middleServerResponse.status !== 200) {
|
||||
throw new Error(`Posting to middle server failed: ${middleServerResponse.statusText}`);
|
||||
}
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.SAVING_TODO_PR_SUCCEEDED);
|
||||
res.status(200).json({ result: "Successfully saved PR" });
|
||||
} catch (error) {
|
||||
console.error("[TASK] Error adding PR to summarizer todo:", error);
|
||||
await namespaceWrapper.storeSet(`result-${roundNumber}`, status.SAVING_TODO_PR_FAILED);
|
||||
res.status(400).json({ error: "Failed to save PR" });
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -1,22 +1,22 @@
|
||||
import dotenv from "dotenv";
|
||||
dotenv.config();
|
||||
|
||||
|
||||
export const status = {
|
||||
ISSUE_FAILED_TO_BE_SUMMARIZED: "Issue failed to be summarized",
|
||||
ISSUE_SUCCESSFULLY_SUMMARIZED: "Issue successfully summarized",
|
||||
ISSUE_SUMMARIZATION_FAILED: "Issue summarization failed",
|
||||
ISSUE_SUMMARIZATION_SUCCEEDED: "Issue successfully summarized",
|
||||
NO_ISSUES_PENDING_TO_BE_SUMMARIZED: "No issues pending to be summarized",
|
||||
ROUND_LESS_THAN_OR_EQUAL_TO_1: "Round <= 1",
|
||||
NO_ORCA_CLIENT: "No orca client",
|
||||
NO_CHOSEN_AS_ISSUE_SUMMARIZER: "No chosen as issue summarizer",
|
||||
NOT_SELECTED_AS_SUMMARIZER: "Not selected as summarizer",
|
||||
UNKNOWN_ERROR: "Unknown error",
|
||||
STAR_ISSUE_FAILED: "Star issue failed",
|
||||
GITHUB_CHECK_FAILED: "GitHub check failed",
|
||||
ANTHROPIC_API_KEY_INVALID: "Anthropic API key invalid",
|
||||
ANTHROPIC_API_KEY_NO_CREDIT: "Anthropic API key has no credit",
|
||||
NO_DATA_FOR_THIS_ROUND: "No data for this round",
|
||||
ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO: "Issue failed to add PR to summarizer todo",
|
||||
}
|
||||
SAVING_TODO_PR_FAILED: "Summarizer todo PR not saved",
|
||||
SAVING_TODO_PR_SUCCEEDED: "Summarizer todo PR saved",
|
||||
};
|
||||
|
||||
export const errorMessage = {
|
||||
ISSUE_FAILED_TO_BE_SUMMARIZED: "We couldn't summarize this issue. Please try again later.",
|
||||
@ -32,7 +32,7 @@ export const errorMessage = {
|
||||
ANTHROPIC_API_KEY_NO_CREDIT: "Your Anthropic API key has no remaining credits.",
|
||||
NO_DATA_FOR_THIS_ROUND: "There is no data available for this round.",
|
||||
ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO: "We couldn't add the PR to the summarizer todo list.",
|
||||
}
|
||||
};
|
||||
|
||||
export const actionMessage = {
|
||||
ISSUE_FAILED_TO_BE_SUMMARIZED: "We couldn't summarize this issue. Please try again later.",
|
||||
@ -44,14 +44,17 @@ export const actionMessage = {
|
||||
UNKNOWN_ERROR: "An unexpected error occurred. Please try again later.",
|
||||
STAR_ISSUE_FAILED: "We couldn't star the issue. Please try again later.",
|
||||
GITHUB_CHECK_FAILED: "Please go to the env variable page to update your GitHub Key.",
|
||||
ANTHROPIC_API_KEY_INVALID: "Please follow the guide under task description page to set up your Anthropic API key correctly.",
|
||||
ANTHROPIC_API_KEY_INVALID:
|
||||
"Please follow the guide under task description page to set up your Anthropic API key correctly.",
|
||||
ANTHROPIC_API_KEY_NO_CREDIT: "Please add credits to continue.",
|
||||
NO_DATA_FOR_THIS_ROUND: "There is no data available for this round.",
|
||||
ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO: "We couldn't add the PR to the summarizer todo list. Please try again later.",
|
||||
}
|
||||
ISSUE_FAILED_TO_ADD_PR_TO_SUMMARIZER_TODO:
|
||||
"We couldn't add the PR to the summarizer todo list. Please try again later.",
|
||||
};
|
||||
/*********************THE CONSTANTS THAT PROD/TEST ARE DIFFERENT *********************/
|
||||
export const defaultBountyMarkdownFile = "https://raw.githubusercontent.com/koii-network/prometheus-swarm-bounties/master/README.md"
|
||||
export const defaultBountyMarkdownFile =
|
||||
"https://raw.githubusercontent.com/koii-network/prometheus-swarm-bounties/master/README.md";
|
||||
|
||||
export const customReward = 400*10**9 // This should be in ROE!
|
||||
export const customReward = 400 * 10 ** 9; // This should be in ROE!
|
||||
|
||||
export const middleServerUrl = "https://ooww84kco0s0cs808w8cg804.dev.koii.network"
|
||||
export const middleServerUrl = "https://ik8kcow8ksw8gwgoo0ggosko.dev.koii.network";
|
||||
|
Reference in New Issue
Block a user