#!/usr/bin/env python3 """ MCP server for Checkov Kubernetes security scanner. """ import asyncio import json import logging import subprocess import sys import tempfile import os from typing import Any, Dict, List import mcp.server.stdio import mcp.types as types from mcp.server import NotificationOptions, Server from mcp.server.models import InitializationOptions # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Create server instance server = Server("checkov-mcp") @server.list_tools() async def handle_list_tools() -> List[types.Tool]: """ List available tools. """ return [ types.Tool( name="scan_kubernetes_manifests", description="Scan Kubernetes manifests for security issues using Checkov", inputSchema={ "type": "object", "properties": { "manifest_content": { "type": "string", "description": "The content of the Kubernetes manifest(s) to scan" } }, "required": ["manifest_content"] } ) ] @server.call_tool() async def handle_call_tool( name: str, arguments: Dict[str, Any] | None ) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]: """ Handle tool calls. """ if name != "scan_kubernetes_manifests": raise ValueError(f"Unknown tool: {name}") if not arguments: raise ValueError("Missing arguments") manifest_content = arguments.get("manifest_content") if not manifest_content: raise ValueError("Missing manifest_content argument") try: # Create a temporary file to hold the manifest content with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_file: temp_file.write(manifest_content) temp_file_path = temp_file.name try: # Run checkov on the manifest file process = await asyncio.create_subprocess_exec( "checkov", "-f", temp_file_path, "--quiet", # Reduce verbosity "--output", "json", # Get JSON output for easier parsing stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) stdout, stderr = await process.communicate() if process.returncode not in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25]: # Checkov returns various codes # Some non-zero codes are expected (findings, etc.) pass result = stdout.decode() if stderr: result += "\nSTDERR:\n" + stderr.decode() # If checkov is not found, we'll get an error from the subprocess if not result.strip() and process.returncode == 127: # command not found typically returns 127 result = "Error: Checkov command not found. Please install checkov." return [ types.TextContent( type="text", text=result ) ] finally: # Clean up the temporary file os.unlink(temp_file_path) except FileNotFoundError: logger.error("Checkov command not found. Please ensure checkov is installed and in PATH.") return [ types.TextContent( type="text", text="Error: Checkov command not found. Please install checkov." ) ] except Exception as e: logger.exception("Error running checkov") return [ types.TextContent( type="text", text=f"Error running checkov: {str(e)}" ) ] async def main(): """ Run the MCP server. """ # Run the server using stdio async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="checkov-mcp", server_version="0.1.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())