import asyncio
from payments_py import Payments, PaymentOptions
from payments_py.a2a.agent_card import build_payment_agent_card
from a2a.server.agent_execution import AgentExecutor
from a2a.types import Task, TaskResult, Message
# Initialize payments
payments = Payments.get_instance(
PaymentOptions(nvm_api_key="nvm:your-key", environment="sandbox")
)
# Define your agent executor
class CodeReviewExecutor(AgentExecutor):
"""Agent that performs code reviews."""
async def execute(self, task: Task, context: dict) -> TaskResult:
# Extract code from task
messages = task.get("messages", [])
code = ""
for msg in messages:
if msg.get("role") == "user":
parts = msg.get("parts", [])
for part in parts:
if part.get("type") == "text":
code = part.get("text", "")
break
# Perform code review (your logic here)
review_result = await self.review_code(code)
return TaskResult(
status="completed",
messages=[
Message(
role="assistant",
parts=[{"type": "text", "text": review_result}]
)
]
)
async def review_code(self, code: str) -> str:
# Your code review logic
return f"Code review completed. Found 3 suggestions for improvement."
# Build agent card
agent_card = build_payment_agent_card(
name="Code Review Agent",
description="AI-powered code review service",
agent_id="did:nvm:code-review-agent",
plan_id="code-review-plan-id",
url="https://localhost:8080",
version="1.0.0"
)
# Start server
async def main():
executor = CodeReviewExecutor()
result = payments.a2a["start"](
agent_card=agent_card,
executor=executor,
port=8080,
expose_agent_card=True,
async_execution=True
)
print(f"A2A Server started on port 8080")
print(f"Agent card: http://localhost:8080/.well-known/agent.json")
# Run server
await result.server.serve()
asyncio.run(main())