146 lines
4.4 KiB
Python
146 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
MCP server for Checkov Kubernetes security scanner.
|
|
"""
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import os
|
|
from typing import Any, Dict, List
|
|
|
|
import mcp.server.stdio
|
|
import mcp.types as types
|
|
from mcp.server import NotificationOptions, Server
|
|
from mcp.server.models import InitializationOptions
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create server instance
|
|
server = Server("checkov-mcp")
|
|
|
|
|
|
@server.list_tools()
|
|
async def handle_list_tools() -> List[types.Tool]:
|
|
"""
|
|
List available tools.
|
|
"""
|
|
return [
|
|
types.Tool(
|
|
name="scan_kubernetes_manifests",
|
|
description="Scan Kubernetes manifests for security issues using Checkov",
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"manifest_content": {
|
|
"type": "string",
|
|
"description": "The content of the Kubernetes manifest(s) to scan"
|
|
}
|
|
},
|
|
"required": ["manifest_content"]
|
|
}
|
|
)
|
|
]
|
|
|
|
|
|
@server.call_tool()
|
|
async def handle_call_tool(
|
|
name: str, arguments: Dict[str, Any] | None
|
|
) -> List[types.TextContent | types.ImageContent | types.EmbeddedResource]:
|
|
"""
|
|
Handle tool calls.
|
|
"""
|
|
if name != "scan_kubernetes_manifests":
|
|
raise ValueError(f"Unknown tool: {name}")
|
|
|
|
if not arguments:
|
|
raise ValueError("Missing arguments")
|
|
|
|
manifest_content = arguments.get("manifest_content")
|
|
if not manifest_content:
|
|
raise ValueError("Missing manifest_content argument")
|
|
|
|
try:
|
|
# Create a temporary file to hold the manifest content
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as temp_file:
|
|
temp_file.write(manifest_content)
|
|
temp_file_path = temp_file.name
|
|
|
|
try:
|
|
# Run checkov on the manifest file
|
|
process = await asyncio.create_subprocess_exec(
|
|
"checkov",
|
|
"-f", temp_file_path,
|
|
"--quiet", # Reduce verbosity
|
|
"--output", "json", # Get JSON output for easier parsing
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
if process.returncode not in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25]: # Checkov returns various codes
|
|
# Some non-zero codes are expected (findings, etc.)
|
|
pass
|
|
|
|
result = stdout.decode()
|
|
if stderr:
|
|
result += "\nSTDERR:\n" + stderr.decode()
|
|
|
|
# If checkov is not found, we'll get an error from the subprocess
|
|
if not result.strip() and process.returncode == 127: # command not found typically returns 127
|
|
result = "Error: Checkov command not found. Please install checkov."
|
|
|
|
return [
|
|
types.TextContent(
|
|
type="text",
|
|
text=result
|
|
)
|
|
]
|
|
finally:
|
|
# Clean up the temporary file
|
|
os.unlink(temp_file_path)
|
|
except FileNotFoundError:
|
|
logger.error("Checkov command not found. Please ensure checkov is installed and in PATH.")
|
|
return [
|
|
types.TextContent(
|
|
type="text",
|
|
text="Error: Checkov command not found. Please install checkov."
|
|
)
|
|
]
|
|
except Exception as e:
|
|
logger.exception("Error running checkov")
|
|
return [
|
|
types.TextContent(
|
|
type="text",
|
|
text=f"Error running checkov: {str(e)}"
|
|
)
|
|
]
|
|
|
|
|
|
async def main():
|
|
"""
|
|
Run the MCP server.
|
|
"""
|
|
# Run the server using stdio
|
|
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
|
|
await server.run(
|
|
read_stream,
|
|
write_stream,
|
|
InitializationOptions(
|
|
server_name="checkov-mcp",
|
|
server_version="0.1.0",
|
|
capabilities=server.get_capabilities(
|
|
notification_options=NotificationOptions(),
|
|
experimental_capabilities={},
|
|
),
|
|
),
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |