from crewai import Agent, Task, Crew from crewai.project import CrewBase, agent, task, crew from crewai_tools import MCPServerAdapter from mcp import StdioServerParameters import os from typing import Dict, Any from pr_reviewer.llm import get_llm @CrewBase class CodeReviewCrew: """Code Review Crew for conducting code quality reviews.""" agents_config = "config/agents.yaml" tasks_config = "config/tasks.yaml" def __init__(self): self.llm = get_llm() self.semgrep_server_params = StdioServerParameters( command="semgrep", args=["--metrics=off", "--json", "--stdin-display-name", "scanned_code", "--"], env={ **os.environ, "SEMGRAPH_APP_URL": os.getenv("SEMGRAPH_APP_URL", ""), "SEMGRAPH_API_TOKEN": os.getenv("SEMGRAPH_API_TOKEN", ""), } ) @agent def code_reviewer(self) -> Agent: """Senior Software Engineer agent for code review.""" return Agent( config=self.agents_config["code_reviewer"], llm=self.llm, tools=[], verbose=True ) @task def code_review_task(self) -> Task: """Task for conducting code review.""" return Task( config=self.tasks_config["code_review_task"], ) @crew def crew(self) -> Crew: """Create the Code Review crew.""" tools = [] try: semgrep_adapter = MCPServerAdapter(self.semgrep_server_params) if hasattr(semgrep_adapter, 'tools'): tools = semgrep_adapter.tools except Exception as e: print(f"MCP adapter not available: {e}") return Crew( agents=[self.code_reviewer()], tasks=[self.code_review_task()], process="sequential", verbose=True, tools=tools, )