from mcp.server.fastmcp import FastMCP, Context from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass from dotenv import load_dotenv import asyncio import json import os import requests import base64 from typing import List, Dict, Any, Optional import re import uuid import shutil import subprocess import pymysql import pymysql.cursors import requests # Add these for debugging import sys import logging # Configure logging # logging.basicConfig(level=logging.DEBUG, # format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', # stream=sys.stdout) logger = logging.getLogger("mcp-vm-manager") # logger.debug("Starting MCP VM Manager Server") load_dotenv() # logger.debug(f"Environment loaded. HOST={os.getenv('HOST')}, PORT={os.getenv('PORT')}, TRANSPORT={os.getenv('TRANSPORT')}") # Create a dataclass for our application context @dataclass class MCPServerContext: """Context for the VM management MCP server.""" base_url: str access_token: Optional[str] = None # Database configuration DB_CONFIG = { 'host': '10.80.7.120', 'user': 'root', 'password': 'cctvamerica', 'port': 3306, 'charset': 'utf8mb4', 'cursorclass': pymysql.cursors.DictCursor } # Create a function to get a database connection def get_db_connection(): """Get a connection to the MySQL database.""" try: connection = pymysql.connect( **DB_CONFIG ) return connection except Exception as e: logger.error(f"Database connection error: {str(e)}") raise @asynccontextmanager async def mcp_server_lifespan(server: FastMCP) -> AsyncIterator[MCPServerContext]: """ Manages the MCP server lifecycle. Args: server: The FastMCP server instance Yields: MCPServerContext: The context containing the base URL and token """ base_url = os.getenv("MCP_BASE_URL", "http://localhost:8080") # Read access token from environment variables that are passed from client's mcp.json access_token = os.getenv("ACCESS_TOKEN") logger.debug(f"Server lifespan starting. Using base_url={base_url}") if access_token: logger.debug("Access token found in environment variables") else: logger.debug("No access token found in environment variables. User will need to set it using set_access_token tool") # Add a small delay to ensure the server is fully initialized before accepting requests logger.debug("Adding initialization delay...") await asyncio.sleep(1.0) logger.debug("Initialization delay complete, yielding context") try: yield MCPServerContext(base_url=base_url, access_token=access_token) finally: # No explicit cleanup needed logger.debug("Server lifespan ending") pass # Initialize FastMCP server mcp = FastMCP( "mcp-vm-manager", description="MCP server for virtual machine management", lifespan=mcp_server_lifespan, host=os.getenv("HOST", "0.0.0.0"), port=os.getenv("PORT", "8050") ) @mcp.tool() async def get_product_list(ctx: Context) -> str: """Get a list of available VM products. This tool retrieves all available VM products that can be created, including their specifications (CPU, RAM, storage, etc.) and pricing. Args: ctx: The MCP server provided context which includes the base URL Returns: A JSON formatted list of all available products """ try: base_url = ctx.request_context.lifespan_context.base_url response = requests.get(f"{base_url}/product/list") if response.status_code == 200: products = response.json() return json.dumps(products, indent=2) else: return f"Error retrieving product list: HTTP {response.status_code}" except Exception as e: return f"Error retrieving product list: {str(e)}" @mcp.tool() async def create_vm(ctx: Context, sku: str, name: str) -> str: """Create a new virtual machine. This tool creates a new VM instance based on the specified SKU and name. Args: ctx: The MCP server provided context which includes the base URL and token sku: The SKU of the VM product to create name: The name to assign to the new VM Returns: A success message with the VM UUID or an error message """ try: base_url = ctx.request_context.lifespan_context.base_url access_token = ctx.request_context.lifespan_context.access_token if not access_token: return "Error: Not logged in. Please login first to get an access token." headers = { "Content-Type": "application/json", "Access-Token": access_token } data = { "sku": sku, "name": name } response = requests.post(f"{base_url}/vm/create", headers=headers, json=data) if response.status_code == 200: result = response.json() return f"Successfully created VM with UUID: {result.get('vm_uuid')}" else: return f"Error creating VM: HTTP {response.status_code} - {response.text}" except Exception as e: return f"Error creating VM: {str(e)}" @mcp.tool() async def get_vm_list(ctx: Context) -> str: """Get a list of all virtual machines under the current user. This tool retrieves all VMs associated with the current user, including their status, IP addresses, and other details. Args: ctx: The MCP server provided context which includes the base URL and token Returns: A JSON formatted list of all VMs belonging to the user """ try: base_url = ctx.request_context.lifespan_context.base_url access_token = ctx.request_context.lifespan_context.access_token if not access_token: return "Error: Not logged in. Please login first to get an access token." headers = { "Access-Token": access_token } response = requests.post(f"{base_url}/vm/list", headers=headers) if response.status_code == 200: vms = response.json() return json.dumps(vms, indent=2) else: return f"Error retrieving VM list: HTTP {response.status_code} - {response.text}" except Exception as e: return f"Error retrieving VM list: {str(e)}" @mcp.tool() async def set_access_token(ctx: Context, token: str) -> str: """Set the access token for authenticated API calls. This tool stores the access token needed for authenticated API calls. Args: ctx: The MCP server provided context token: The access token obtained from login Returns: A confirmation message """ try: ctx.request_context.lifespan_context.access_token = token return "Access token successfully set" except Exception as e: return f"Error setting access token: {str(e)}" @mcp.tool() async def get_vm_count(ctx: Context) -> str: """Get the number of VMs for the current user. This tool returns a simple count of how many VMs are associated with the current user. Args: ctx: The MCP server provided context which includes the base URL and token Returns: A message with the number of VMs """ logger.debug("get_vm_count tool called") try: base_url = ctx.request_context.lifespan_context.base_url access_token = ctx.request_context.lifespan_context.access_token if not access_token: return "Error: Not logged in. Please login first to get an access token." headers = { "Access-Token": access_token } logger.debug(f"Making request to {base_url}/vm/list") response = requests.post(f"{base_url}/vm/list", headers=headers) if response.status_code == 200: vms = response.json() vm_count = len(vms) logger.debug(f"Found {vm_count} VMs") return f"You have {vm_count} VMs." else: error = f"Error retrieving VM list: HTTP {response.status_code} - {response.text}" logger.error(error) return error except Exception as e: error = f"Error retrieving VM count: {str(e)}" logger.exception(error) return error @mcp.tool() async def check_domain_availability(ctx: Context, domain_name: str) -> str: """Check if a domain name is available. This tool checks whether a specified subdomain under smartzliving.ai is available. You need to ask the user to provide the subdomain name. Args: ctx: The MCP server provided context domain_name: The subdomain name to check Returns: A message indicating whether the domain is available """ logger.debug(f"Checking availability for domain: {domain_name}") try: # Validate domain name if not re.match(r'^[a-z][a-z0-9\-]*$', domain_name): return "Error: Domain name must start with a letter and contain only lowercase letters, numbers, and hyphens." # Check if domain exists in database connection = get_db_connection() with connection.cursor() as cursor: cursor.execute("USE mcp_domains") cursor.execute("SELECT * FROM domains WHERE domain_name = %s", (domain_name,)) result = cursor.fetchone() connection.close() if result: return f"The domain {domain_name}.smartzliving.ai is already in use." else: return f"The domain {domain_name}.smartzliving.ai is available." except Exception as e: error_msg = f"Error checking domain availability: {str(e)}" logger.exception(error_msg) return error_msg @mcp.tool() async def deploy_repository_site(ctx: Context, repository_url: str, domain_name: str, username: str, token: str) -> str: """Deploy a website from a Git repository to a custom domain. Args: ctx: The MCP server provided context repository_url: The Git repository URL to clone (GitHub, GitLab, or self-hosted Git) domain_name: The subdomain name always ask for it, the full domain name will be domain_name.smartzliving.ai username: username for private repositories token: token for private repositories Returns: A success message or an error message """ logger.debug(f"Received deploy request for repository: {repository_url} to domain: {domain_name}") try: # Validate domain name (alphanumeric and hyphens only, must start with a letter) if not re.match(r'^[a-z][a-z0-9\-]*$', domain_name): return "Error: Domain name must start with a letter and contain only lowercase letters, numbers, and hyphens." # Check if domain is already in use connection = get_db_connection() try: with connection.cursor() as cursor: cursor.execute("USE mcp_domains") cursor.execute("SELECT * FROM domains WHERE domain_name = %s", (domain_name,)) existing_domain = cursor.fetchone() if existing_domain: # Domain exists, check if it's for the same repository if existing_domain['github_url'] == repository_url: logger.info(f"Domain {domain_name} already exists for this repository. Redeploying...") else: return f"Error: The domain {domain_name}.smartzliving.ai is already in use with a different repository." except Exception as db_error: logger.error(f"Database error checking domain: {str(db_error)}") return f"Error checking domain availability: {str(db_error)}" finally: connection.close() # Check if credentials are needed (for private repositories) # Modify the URL to include credentials if provided clone_url = repository_url # Generate a unique ID for this deployment deployment_id = str(uuid.uuid4()) # Define paths domain_path = f"/home/work/deploy/{domain_name}" code_path = f"/home/work/code/{deployment_id}" # Create directories os.makedirs(domain_path, exist_ok=True) os.makedirs(code_path, exist_ok=True) logger.debug(f"Created directories: {domain_path} and {code_path}") # Clone repository logger.debug(f"Cloning repository: {repository_url}") # Don't log the clone_url as it might contain credentials if clone_url.startswith("https://"): clone_url = clone_url.split("https://")[1] if token and username: username = username.replace("@", "%40") clone_url = f"https://{username}:{token}@{clone_url}" print(clone_url) print("full clone command is:") print(f"su work -c 'git clone {clone_url} {code_path}'") # Clone the repository result = subprocess.run( [f"su work -c 'cd {code_path} && git clone {clone_url}'"], capture_output=True, text=True ) if result.returncode != 0: error_msg = f"Failed to clone repository: {result.stderr}" logger.error(error_msg) # Clean up failed deployment shutil.rmtree(code_path, ignore_errors=True) return error_msg # Install dependencies and build logger.debug("Installing dependencies and building project") build_commands = [ f"cd {code_path}", "npm install", "npm run build" ] result = subprocess.run( [build_commands], capture_output=True, text=True ) if result.returncode != 0: error_msg = f"Failed to build project: {result.stderr}" logger.error(error_msg) # Clean up failed deployment shutil.rmtree(code_path, ignore_errors=True) return error_msg # Check if build directory exists build_dir = os.path.join(code_path, "build") if not os.path.exists(build_dir): build_dir = os.path.join(code_path, "dist") # Try alternative build folder name if not os.path.exists(build_dir): error_msg = "Build directory not found. Make sure the project has a standard build setup." logger.error(error_msg) # Clean up failed deployment shutil.rmtree(code_path, ignore_errors=True) return error_msg # Clear existing domain files for item in os.listdir(domain_path): item_path = os.path.join(domain_path, item) if os.path.isfile(item_path): os.unlink(item_path) else: shutil.rmtree(item_path) # Copy build files to domain directory for item in os.listdir(build_dir): source = os.path.join(build_dir, item) destination = os.path.join(domain_path, item) if os.path.isdir(source): shutil.copytree(source, destination) else: shutil.copy2(source, destination) # Clean up code directory shutil.rmtree(code_path, ignore_errors=True) url = "https://api.cloudflare.com/client/v4/zones/6183d711284073733db288ac2de24360/dns_records" if domain_name.startswith("https://"): domain_name = domain_name.split("https://")[1] if domain_name.startswith("http://"): domain_name = domain_name.split("http://")[1] payload = json.dumps({ "comment": f"joe mcp {domain_name} test", "content": "50.226.99.215", "name": f"{domain_name}.smartzliving.ai", "proxied": True, "ttl": 3600, "type": "A" }) headers = { 'X-Auth-Email': 'penneysun@meshare.cn', 'Content-Type': 'application/json', 'Authorization': 'Bearer dM2Pml3CCOoMkJT82CX6tvnNGtIkIf4EN5Ymui08' } response = requests.request("POST", url, headers=headers, data=payload) if response.status_code != 200: logger.error(f"Error setting up Cloudflare DNS: {response.text}") return f"Error setting up Cloudflare DNS: {response.text}" # Update the database connection = get_db_connection() try: with connection.cursor() as cursor: cursor.execute("USE mcp_domains") if existing_domain: # Update the existing entry cursor.execute( "UPDATE domains SET github_url = %s WHERE domain_name = %s", (repository_url, domain_name) ) else: # Insert a new entry cursor.execute( "INSERT INTO domains (domain_name, github_url) VALUES (%s, %s)", (domain_name, repository_url) ) connection.commit() except Exception as db_error: logger.error(f"Database error updating domain record: {str(db_error)}") finally: connection.close() success_msg = f"Site successfully deployed to https://{domain_name}.smartzliving.ai" logger.info(success_msg) # Send a log message notification to the client ctx.request_context.session.send_log_message( level="info", data=success_msg ) return success_msg except Exception as e: import traceback traceback.print_exc() error_msg = f"Error deploying site: {str(e)}" logger.exception(error_msg) # Send an error log message notification to the client ctx.request_context.session.send_log_message( level="error", data=f"Deployment failed: {str(e)}" ) return error_msg async def main(): # Run the MCP server with sse transport await mcp.run_sse_async() if __name__ == "__main__": asyncio.run(main())