pr_reviewer/mcp_servers/checkov_mcp.py
2026-05-08 23:46:17 +10:00

146 lines
4.4 KiB
Python

#!/usr/bin/env python3
"""
MCP server for Checkov Kubernetes security scanner.
"""
import asyncio
import json
import logging
import subprocess
import sys
import tempfile
import os
from typing import Any, Dict, List
import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create server instance
server = Server("checkov-mcp")
@server.list_tools()
async def handle_list_tools() -> List[types.Tool]:
"""
List available tools.
"""
return [
types.Tool(
name="scan_kubernetes_manifests",
description="Scan Kubernetes manifests for security issues using Checkov",
inputSchema={
"type": "object",
"properties": {
"manifest_content": {
"type": "string",
"description": "The content of the Kubernetes manifest(s) to scan"
}
},
"required": ["manifest_content"]
}
)
]
@server.call_tool()
async def handle_call_tool(
name: str, arguments: Dict[str, Any] | None
) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]:
"""
Handle tool calls.
"""
if name != "scan_kubernetes_manifests":
raise ValueError(f"Unknown tool: {name}")
if not arguments:
raise ValueError("Missing arguments")
manifest_content = arguments.get("manifest_content")
if not manifest_content:
raise ValueError("Missing manifest_content argument")
try:
# Create a temporary file to hold the manifest content
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_file:
temp_file.write(manifest_content)
temp_file_path = temp_file.name
try:
# Run checkov on the manifest file
process = await asyncio.create_subprocess_exec(
"checkov",
"-f", temp_file_path,
"--quiet", # Reduce verbosity
"--output", "json", # Get JSON output for easier parsing
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode not in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25]: # Checkov returns various codes
# Some non-zero codes are expected (findings, etc.)
pass
result = stdout.decode()
if stderr:
result += "\nSTDERR:\n" + stderr.decode()
# If checkov is not found, we'll get an error from the subprocess
if not result.strip() and process.returncode == 127: # command not found typically returns 127
result = "Error: Checkov command not found. Please install checkov."
return [
types.TextContent(
type="text",
text=result
)
]
finally:
# Clean up the temporary file
os.unlink(temp_file_path)
except FileNotFoundError:
logger.error("Checkov command not found. Please ensure checkov is installed and in PATH.")
return [
types.TextContent(
type="text",
text="Error: Checkov command not found. Please install checkov."
)
]
except Exception as e:
logger.exception("Error running checkov")
return [
types.TextContent(
type="text",
text=f"Error running checkov: {str(e)}"
)
]
async def main():
"""
Run the MCP server.
"""
# Run the server using stdio
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="checkov-mcp",
server_version="0.1.0",
capabilities=server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
if __name__ == "__main__":
asyncio.run(main())