diff --git a/worker/orca-agent/requirements.txt b/worker/orca-agent/requirements.txt index a74b126..132c25c 100644 --- a/worker/orca-agent/requirements.txt +++ b/worker/orca-agent/requirements.txt @@ -15,5 +15,6 @@ base58>=2.1.0 tenacity>=9.0.0 sqlmodel>=0.0.22 openai>=0.28.0 -colorama>=0.4.67prometheus-swarm>=0.1.7 -prometheus-test>=0.1.2 +colorama>=0.4.67 +prometheus-swarm>=0.1.7 +prometheus-test>=0.1.4 diff --git a/worker/orca-agent/tests/stages/update_audit.py b/worker/orca-agent/tests/stages/update_audit.py index c9fe3bd..1be9bdd 100644 --- a/worker/orca-agent/tests/stages/update_audit.py +++ b/worker/orca-agent/tests/stages/update_audit.py @@ -7,14 +7,14 @@ def prepare(runner, worker, role: str): """Prepare data for worker task""" return { - "taskId": runner.config.task_id, - "round": runner.current_round, + "taskId": runner.get("task_id"), + "round": runner.get("current_round"), } def execute(runner, worker, data): """Execute worker task step""" - url = f"{runner.config.middle_server_url}/summarizer/worker/update-audit-result" + url = f"{runner.get('middle_server_url')}/summarizer/worker/update-audit-result" response = requests.post( url, json=data, diff --git a/worker/orca-agent/tests/stages/worker_audit.py b/worker/orca-agent/tests/stages/worker_audit.py index be56f09..8667879 100644 --- a/worker/orca-agent/tests/stages/worker_audit.py +++ b/worker/orca-agent/tests/stages/worker_audit.py @@ -7,30 +7,22 @@ import requests def prepare(runner, worker, target_name): """Prepare data for worker audit""" - round_state = runner.state["rounds"].get(str(runner.current_round), {}) - pr_urls = round_state.get("pr_urls", {}) - - if target_name not in pr_urls: - # Return None to indicate this step should be skipped - print( - f"✓ No PR URL found for {target_name}, skipping {worker.name} audit - continuing" - ) + pr_url = runner.get(f"pr_urls.{target_name}") + if pr_url is None: + print(f"✓ No pr_urls.{target_name} found - continuing") return None - # Get submission data from state - submission_data = round_state.get("submission_data", {}).get(target_name) - if not submission_data: - # Return None to indicate this step should be skipped - print( - f"✓ No submission data found for {target_name}, skipping {worker.name} audit - continuing" - ) + submission_data = runner.get(f"submission_data.{target_name}") + if submission_data is None: + print(f"✓ No submission_data.{target_name} found - continuing") return None + # Note: Commented out code is preserved as it may be needed in future # Create auditor payload which is used to generate the signature # auditor_payload = { - # "taskId": runner.config.task_id, - # "roundNumber": runner.current_round, - # "prUrl": pr_urls[target_name], + # "taskId": runner.get("task_id"), + # "roundNumber": runner.get("current_round"), + # "prUrl": pr_url, # "stakingKey": worker.staking_public_key, # "pubKey": worker.public_key, # } @@ -38,9 +30,9 @@ def prepare(runner, worker, target_name): # Structure the payload according to what the server expects # return { # "submission": { - # "taskId": runner.config.task_id, - # "roundNumber": runner.current_round, - # "prUrl": pr_urls[target_name], + # "taskId": runner.get("task_id"), + # "roundNumber": runner.get("current_round"), + # "prUrl": pr_url, # "githubUsername": submission_data.get("githubUsername"), # "repoOwner": submission_data.get("repoOwner"), # "repoName": submission_data.get("repoName"), @@ -52,7 +44,7 @@ def prepare(runner, worker, target_name): # "submitterSignature": submission_data.get("signature"), # "submitterStakingKey": submission_data.get("stakingKey"), # "submitterPubKey": submission_data.get("pubKey"), - # "prUrl": pr_urls[target_name], + # "prUrl": pr_url, # "repoOwner": submission_data.get("repoOwner"), # "repoName": submission_data.get("repoName"), # "githubUsername": worker.env.get("GITHUB_USERNAME"), @@ -75,7 +67,7 @@ def execute(runner, worker, data): "message": "Skipped due to missing PR URL or submission data", } - url = f"{worker.url}/worker-audit/{runner.current_round}" + url = f"{worker.url}/worker-audit/{runner.get('current_round')}" response = requests.post(url, json=data) result = response.json() diff --git a/worker/orca-agent/tests/stages/worker_check.py b/worker/orca-agent/tests/stages/worker_check.py index 86fab89..fb52307 100644 --- a/worker/orca-agent/tests/stages/worker_check.py +++ b/worker/orca-agent/tests/stages/worker_check.py @@ -5,16 +5,16 @@ import requests def prepare(runner, worker): """Prepare data for worker task""" - # Create fetch-todo payload for stakingSignature and publicSignature - round_state = runner.state["rounds"].get(str(runner.current_round), {}) - if not round_state.get("pr_urls"): - print(f"✓ No PR URLs found for {worker.name} - continuing") - return + pr_url = runner.get(f"pr_urls.{worker.name}") + if pr_url is None: + print(f"✓ No pr_urls.{worker.name} found - continuing") + return None + return { "stakingKey": worker.staking_public_key, - "roundNumber": runner.current_round, + "roundNumber": runner.get("current_round"), "githubUsername": worker.env.get("GITHUB_USERNAME"), - "prUrl": round_state.get("pr_urls", {}).get(worker.name), + "prUrl": pr_url, } @@ -22,7 +22,7 @@ def execute(runner, worker, data): """Execute worker task step""" if not data: return {"success": True, "message": "No PR URL found"} - url = f"{runner.config.middle_server_url}/summarizer/worker/check-todo" + url = f"{runner.get('middle_server_url')}/summarizer/worker/check-todo" response = requests.post( url, json=data, diff --git a/worker/orca-agent/tests/stages/worker_fetch.py b/worker/orca-agent/tests/stages/worker_fetch.py index 64b4530..69548ad 100644 --- a/worker/orca-agent/tests/stages/worker_fetch.py +++ b/worker/orca-agent/tests/stages/worker_fetch.py @@ -8,8 +8,8 @@ def prepare(runner, worker): """Prepare data for worker task""" # Create fetch-todo payload for stakingSignature and publicSignature payload = { - "taskId": runner.config.task_id, - "roundNumber": runner.current_round, + "taskId": runner.get("task_id"), + "roundNumber": runner.get("current_round"), "action": "fetch-todo", "githubUsername": worker.env.get("GITHUB_USERNAME"), "stakingKey": worker.staking_public_key, @@ -17,8 +17,8 @@ def prepare(runner, worker): } return { - "taskId": runner.config.task_id, - "roundNumber": runner.current_round, + "taskId": runner.get("task_id"), + "roundNumber": runner.get("current_round"), "stakingKey": worker.staking_public_key, "pubKey": worker.public_key, "stakingSignature": create_signature(worker.staking_signing_key, payload), @@ -28,7 +28,7 @@ def prepare(runner, worker): def execute(runner, worker, data): """Execute worker task step""" - url = f"{runner.config.middle_server_url}/summarizer/worker/fetch-todo" + url = f"{runner.get('middle_server_url')}/summarizer/worker/fetch-todo" response = requests.post( url, json={"signature": data["stakingSignature"], "stakingKey": data["stakingKey"]}, @@ -45,10 +45,7 @@ def execute(runner, worker, data): response.raise_for_status() if result.get("success"): - round_key = str(runner.current_round) - round_state = runner.state["rounds"].setdefault(round_key, {}) - round_state["repo_url"] = ( - f"https://github.com/{result['data']['repo_owner']}/{result['data']['repo_name']}" - ) + repo_url = f"https://github.com/{result['data']['repo_owner']}/{result['data']['repo_name']}" + runner.set("repo_url", repo_url, scope="round") return result diff --git a/worker/orca-agent/tests/stages/worker_pr.py b/worker/orca-agent/tests/stages/worker_pr.py index 50f9ca9..eee8010 100644 --- a/worker/orca-agent/tests/stages/worker_pr.py +++ b/worker/orca-agent/tests/stages/worker_pr.py @@ -3,17 +3,16 @@ from prometheus_test.utils import create_signature def prepare(runner, worker): - round_state = runner.state["rounds"].get(str(runner.current_round), {}) - - if worker.name not in round_state.get("pr_urls", {}): - print(f"✓ No PR URL found for {worker.name} - continuing") + pr_url = runner.get(f"pr_urls.{worker.name}") + if pr_url is None: + print(f"✓ No pr_urls.{worker.name} found - continuing") return None payload = { - "taskId": runner.config.task_id, + "taskId": runner.get("task_id"), "action": "add-todo-pr", - "roundNumber": runner.current_round, - "prUrl": round_state["pr_urls"][worker.name], + "roundNumber": runner.get("current_round"), + "prUrl": pr_url, "stakingKey": worker.staking_public_key, "pubKey": worker.public_key, } @@ -29,7 +28,7 @@ def execute(runner, worker, data): if data is None: return {"success": True, "message": "Skipped due to missing PR URL"} - url = f"{runner.config.middle_server_url}/summarizer/worker/add-todo-pr" + url = f"{runner.get('middle_server_url')}/summarizer/worker/add-todo-pr" response = requests.post( url, json={"signature": data["signature"], "stakingKey": data["stakingKey"]}, diff --git a/worker/orca-agent/tests/stages/worker_submission.py b/worker/orca-agent/tests/stages/worker_submission.py index 39b0930..7ffcbd0 100644 --- a/worker/orca-agent/tests/stages/worker_submission.py +++ b/worker/orca-agent/tests/stages/worker_submission.py @@ -6,25 +6,21 @@ from prometheus_test.utils import create_signature def prepare(runner, worker): """Prepare data for worker submission""" - # Get the current round's state - round_state = runner.state.get("rounds", {}).get(str(runner.current_round), {}) - pr_urls = round_state.get("pr_urls", {}) - - if worker.name not in pr_urls: - # Return None to indicate this step should be skipped - print(f"✓ No PR URL found for {worker.name} - continuing") + pr_url = runner.get(f"pr_urls.{worker.name}") + if pr_url is None: + print(f"✓ No pr_urls.{worker.name} found - continuing") return None # Get submission data from worker - url = f"{worker.url}/submission/{runner.current_round}" + url = f"{worker.url}/submission/{runner.get('current_round')}" response = requests.get(url) response.raise_for_status() submission_data = response.json() # Create signature for the submission submitter_payload = { - "taskId": runner.config.task_id, - "roundNumber": runner.current_round, + "taskId": runner.get("task_id"), + "roundNumber": runner.get("current_round"), "stakingKey": worker.staking_public_key, "pubKey": worker.public_key, "action": "audit", @@ -46,15 +42,7 @@ def execute(runner, worker, data): return {"success": True, "message": "Skipped due to missing PR URL"} # Store submission data in state - round_key = str(runner.current_round) - round_state = runner.state["rounds"].setdefault(round_key, {}) - - # Initialize submission_data if not exists - if "submission_data" not in round_state: - round_state["submission_data"] = {} - - # Store or update submission data - round_state["submission_data"][worker.name] = data + runner.set(f"submission_data.{worker.name}", data, scope="round") # Return success result return {"success": True, "data": data} diff --git a/worker/orca-agent/tests/stages/worker_task.py b/worker/orca-agent/tests/stages/worker_task.py index c8c9ca6..4464039 100644 --- a/worker/orca-agent/tests/stages/worker_task.py +++ b/worker/orca-agent/tests/stages/worker_task.py @@ -1,18 +1,34 @@ """Stage for executing worker tasks.""" import requests +from uuid import uuid4 +from prometheus_test.utils import create_signature def prepare(runner, worker): """Prepare data for worker task""" - round_state = runner.state["rounds"].get(str(runner.current_round), {}) - if not round_state.get("repo_url"): - print(f"✓ No repo url found for {worker.name} - continuing") - return + repo_url = runner.get("repo_url") + if repo_url is None: + print("✓ No repo_url found - continuing") + return None + + # Generate UUID for this round + uuid = str(uuid4()) + runner.set(f"uuid.{worker.name}", uuid, scope="round") + + # Create podcall payload and signature + podcall_payload = { + "taskId": runner.get("task_id"), + "roundNumber": runner.get("current_round"), + "uuid": uuid, + } + podcall_signature = create_signature(worker.staking_signing_key, podcall_payload) + return { - "taskId": runner.config.task_id, - "round_number": str(runner.current_round), - "repo_url": round_state["repo_url"], + "task_id": runner.get("task_id"), + "round_number": str(runner.get("current_round")), + "repo_url": repo_url, + "podcall_signature": podcall_signature, } @@ -20,7 +36,7 @@ def execute(runner, worker, data): """Execute worker task step""" if not data: return {"success": True, "message": "No repo url found"} - url = f"{worker.url}/worker-task/{runner.current_round}" + url = f"{worker.url}/worker-task/{runner.get('current_round')}" response = requests.post(url, json=data) result = response.json() @@ -32,12 +48,9 @@ def execute(runner, worker, data): return {"success": True, "message": result.get("message")} if result.get("success") and "pr_url" in result["result"]["data"]: - round_key = str(runner.current_round) - round_state = runner.state["rounds"].setdefault(round_key, {}) - - # Initialize pr_urls if not exists - if "pr_urls" not in round_state: - round_state["pr_urls"] = {} - round_state["pr_urls"][worker.name] = result["result"]["data"]["pr_url"] + # Store PR URL in state + runner.set( + f"pr_urls.{worker.name}", result["result"]["data"]["pr_url"], scope="round" + ) return result