server.py 18.4 KB
Newer Older
Joe Chen's avatar
Joe Chen committed
1
2
3
4
5
6
7
8
9
from mcp.server.fastmcp import FastMCP, Context
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass
from dotenv import load_dotenv
import asyncio
import json
import os
import requests
Joe Chen's avatar
ss    
Joe Chen committed
10
import base64
Joe Chen's avatar
Joe Chen committed
11
from typing import List, Dict, Any, Optional
Joe Chen's avatar
ss    
Joe Chen committed
12
13
14
15
import re
import uuid
import shutil
import subprocess
Joe Chen's avatar
ss    
Joe Chen committed
16
17
18
import pymysql
import pymysql.cursors
import requests
Joe Chen's avatar
Joe Chen committed
19
20
21
22
23
# Add these for debugging
import sys
import logging

# Configure logging
Joe Chen's avatar
ss    
Joe Chen committed
24
25
26
# logging.basicConfig(level=logging.DEBUG, 
#                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
#                    stream=sys.stdout)
Joe Chen's avatar
ss    
Joe Chen committed
27
logger = logging.getLogger("mcp-vm-manager")
Joe Chen's avatar
Joe Chen committed
28

Joe Chen's avatar
ss    
Joe Chen committed
29
# logger.debug("Starting MCP VM Manager Server")
Joe Chen's avatar
Joe Chen committed
30
31

load_dotenv()
Joe Chen's avatar
ss    
Joe Chen committed
32
# logger.debug(f"Environment loaded. HOST={os.getenv('HOST')}, PORT={os.getenv('PORT')}, TRANSPORT={os.getenv('TRANSPORT')}")
Joe Chen's avatar
Joe Chen committed
33
34
35
36
37
38
39
40

# Create a dataclass for our application context
@dataclass
class MCPServerContext:
    """Context for the VM management MCP server."""
    base_url: str
    access_token: Optional[str] = None

Joe Chen's avatar
ss    
Joe Chen committed
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# Database configuration
DB_CONFIG = {
    'host': '10.80.7.120',
    'user': 'root',
    'password': 'cctvamerica',
    'port': 3306,
    'charset': 'utf8mb4',
    'cursorclass': pymysql.cursors.DictCursor
}

# Create a function to get a database connection
def get_db_connection():
    """Get a connection to the MySQL database."""
    try:
        connection = pymysql.connect(
            **DB_CONFIG
        )
        return connection
    except Exception as e:
        logger.error(f"Database connection error: {str(e)}")
        raise



Joe Chen's avatar
Joe Chen committed
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@asynccontextmanager
async def mcp_server_lifespan(server: FastMCP) -> AsyncIterator[MCPServerContext]:
    """
    Manages the MCP server lifecycle.
    
    Args:
        server: The FastMCP server instance
        
    Yields:
        MCPServerContext: The context containing the base URL and token
    """
    base_url = os.getenv("MCP_BASE_URL", "http://localhost:8080")
    # Read access token from environment variables that are passed from client's mcp.json
    access_token = os.getenv("ACCESS_TOKEN")
    logger.debug(f"Server lifespan starting. Using base_url={base_url}")
    if access_token:
        logger.debug("Access token found in environment variables")
    else:
        logger.debug("No access token found in environment variables. User will need to set it using set_access_token tool")
    
Joe Chen's avatar
ss    
Joe Chen committed
85
    
Joe Chen's avatar
Joe Chen committed
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
    # Add a small delay to ensure the server is fully initialized before accepting requests
    logger.debug("Adding initialization delay...")
    await asyncio.sleep(1.0)
    logger.debug("Initialization delay complete, yielding context")
    
    try:
        yield MCPServerContext(base_url=base_url, access_token=access_token)
    finally:
        # No explicit cleanup needed
        logger.debug("Server lifespan ending")
        pass

# Initialize FastMCP server
mcp = FastMCP(
    "mcp-vm-manager",
    description="MCP server for virtual machine management",
    lifespan=mcp_server_lifespan,
    host=os.getenv("HOST", "0.0.0.0"),
    port=os.getenv("PORT", "8050")
)        

@mcp.tool()
async def get_product_list(ctx: Context) -> str:
    """Get a list of available VM products.
    
    This tool retrieves all available VM products that can be created,
    including their specifications (CPU, RAM, storage, etc.) and pricing.
    
    Args:
        ctx: The MCP server provided context which includes the base URL
    
    Returns:
        A JSON formatted list of all available products
    """
    try:
        base_url = ctx.request_context.lifespan_context.base_url
        response = requests.get(f"{base_url}/product/list")
        
        if response.status_code == 200:
            products = response.json()
            return json.dumps(products, indent=2)
        else:
            return f"Error retrieving product list: HTTP {response.status_code}"
    except Exception as e:
        return f"Error retrieving product list: {str(e)}"

@mcp.tool()
async def create_vm(ctx: Context, sku: str, name: str) -> str:
    """Create a new virtual machine.
    
    This tool creates a new VM instance based on the specified SKU and name.
    
    Args:
        ctx: The MCP server provided context which includes the base URL and token
        sku: The SKU of the VM product to create
        name: The name to assign to the new VM
    
    Returns:
        A success message with the VM UUID or an error message
    """
    try:
        base_url = ctx.request_context.lifespan_context.base_url
        access_token = ctx.request_context.lifespan_context.access_token
        
        if not access_token:
            return "Error: Not logged in. Please login first to get an access token."
        
        headers = {
            "Content-Type": "application/json",
            "Access-Token": access_token
        }
        
        data = {
            "sku": sku,
            "name": name
        }
        
        response = requests.post(f"{base_url}/vm/create", headers=headers, json=data)
        
        if response.status_code == 200:
            result = response.json()
            return f"Successfully created VM with UUID: {result.get('vm_uuid')}"
        else:
            return f"Error creating VM: HTTP {response.status_code} - {response.text}"
    except Exception as e:
        return f"Error creating VM: {str(e)}"

@mcp.tool()
async def get_vm_list(ctx: Context) -> str:
    """Get a list of all virtual machines under the current user.
    
    This tool retrieves all VMs associated with the current user,
    including their status, IP addresses, and other details.
    
    Args:
        ctx: The MCP server provided context which includes the base URL and token
    
    Returns:
        A JSON formatted list of all VMs belonging to the user
    """
    try:
        base_url = ctx.request_context.lifespan_context.base_url
        access_token = ctx.request_context.lifespan_context.access_token
        
        if not access_token:
            return "Error: Not logged in. Please login first to get an access token."
        
        headers = {
            "Access-Token": access_token
        }
        
        response = requests.post(f"{base_url}/vm/list", headers=headers)
        
        if response.status_code == 200:
            vms = response.json()
            return json.dumps(vms, indent=2)
        else:
            return f"Error retrieving VM list: HTTP {response.status_code} - {response.text}"
    except Exception as e:
        return f"Error retrieving VM list: {str(e)}"

@mcp.tool()
async def set_access_token(ctx: Context, token: str) -> str:
    """Set the access token for authenticated API calls.
    
    This tool stores the access token needed for authenticated API calls.
    
    Args:
        ctx: The MCP server provided context
        token: The access token obtained from login
    
    Returns:
        A confirmation message
    """
    try:
        ctx.request_context.lifespan_context.access_token = token
Joe Chen's avatar
ss    
Joe Chen committed
222

Joe Chen's avatar
Joe Chen committed
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
        return "Access token successfully set"
    except Exception as e:
        return f"Error setting access token: {str(e)}"

@mcp.tool()
async def get_vm_count(ctx: Context) -> str:
    """Get the number of VMs for the current user.
    
    This tool returns a simple count of how many VMs are associated with the current user.
    
    Args:
        ctx: The MCP server provided context which includes the base URL and token
    
    Returns:
        A message with the number of VMs
    """
    logger.debug("get_vm_count tool called")
    try:
        base_url = ctx.request_context.lifespan_context.base_url
        access_token = ctx.request_context.lifespan_context.access_token
        
        if not access_token:
            return "Error: Not logged in. Please login first to get an access token."
        
        headers = {
            "Access-Token": access_token
        }
        
        logger.debug(f"Making request to {base_url}/vm/list")
        response = requests.post(f"{base_url}/vm/list", headers=headers)
        
        if response.status_code == 200:
            vms = response.json()
            vm_count = len(vms)
            logger.debug(f"Found {vm_count} VMs")
            return f"You have {vm_count} VMs."
        else:
            error = f"Error retrieving VM list: HTTP {response.status_code} - {response.text}"
            logger.error(error)
            return error
    except Exception as e:
        error = f"Error retrieving VM count: {str(e)}"
        logger.exception(error)
        return error

Joe Chen's avatar
ss    
Joe Chen committed
268

Joe Chen's avatar
ss    
Joe Chen committed
269
@mcp.tool()
Joe Chen's avatar
ss    
Joe Chen committed
270
271
async def check_domain_availability(ctx: Context, domain_name: str) -> str:
    """Check if a domain name is available.
Joe Chen's avatar
ss    
Joe Chen committed
272
    
Joe Chen's avatar
ss    
Joe Chen committed
273
    This tool checks whether a specified subdomain under smartzliving.ai is available. You need to ask the user to provide the subdomain name.
Joe Chen's avatar
ss    
Joe Chen committed
274
275
276
    
    Args:
        ctx: The MCP server provided context
Joe Chen's avatar
ss    
Joe Chen committed
277
        domain_name: The subdomain name to check
Joe Chen's avatar
ss    
Joe Chen committed
278
279
    
    Returns:
Joe Chen's avatar
ss    
Joe Chen committed
280
        A message indicating whether the domain is available
Joe Chen's avatar
ss    
Joe Chen committed
281
    """
Joe Chen's avatar
ss    
Joe Chen committed
282
    logger.debug(f"Checking availability for domain: {domain_name}")
Joe Chen's avatar
ss    
Joe Chen committed
283
284
    
    try:
Joe Chen's avatar
ss    
Joe Chen committed
285
286
287
        # Validate domain name
        if not re.match(r'^[a-z][a-z0-9\-]*$', domain_name):
            return "Error: Domain name must start with a letter and contain only lowercase letters, numbers, and hyphens."
Joe Chen's avatar
ss    
Joe Chen committed
288
        
Joe Chen's avatar
ss    
Joe Chen committed
289
290
291
292
293
294
295
        # Check if domain exists in database
        connection = get_db_connection()
        with connection.cursor() as cursor:
            cursor.execute("USE mcp_domains")
            cursor.execute("SELECT * FROM domains WHERE domain_name = %s", (domain_name,))
            result = cursor.fetchone()
        connection.close()
Joe Chen's avatar
ss    
Joe Chen committed
296
        
Joe Chen's avatar
ss    
Joe Chen committed
297
298
        if result:
            return f"The domain {domain_name}.smartzliving.ai is already in use."
Joe Chen's avatar
ss    
Joe Chen committed
299
        else:
Joe Chen's avatar
ss    
Joe Chen committed
300
            return f"The domain {domain_name}.smartzliving.ai is available."
Joe Chen's avatar
ss    
Joe Chen committed
301
    except Exception as e:
Joe Chen's avatar
ss    
Joe Chen committed
302
303
304
305
        error_msg = f"Error checking domain availability: {str(e)}"
        logger.exception(error_msg)
        return error_msg

Joe Chen's avatar
ss    
Joe Chen committed
306
@mcp.tool()
Joe Chen's avatar
ss    
Joe Chen committed
307
async def deploy_repository_site(ctx: Context, repository_url: str, domain_name: str, username: str, token: str) -> str:
Joe Chen's avatar
ss    
Joe Chen committed
308
    """Deploy a website from a Git repository to a custom domain.
Joe Chen's avatar
ss    
Joe Chen committed
309
310
311
312
    
    
    Args:
        ctx: The MCP server provided context
Joe Chen's avatar
ss    
Joe Chen committed
313
        repository_url: The Git repository URL to clone (GitHub, GitLab, or self-hosted Git)
Joe Chen's avatar
ss    
Joe Chen committed
314
315
316
        domain_name: The subdomain name always ask for it, the full domain name will be domain_name.smartzliving.ai
        username: username for private repositories
        token: token for private repositories
Joe Chen's avatar
ss    
Joe Chen committed
317
318
319
320
    
    Returns:
        A success message or an error message
    """
Joe Chen's avatar
ss    
Joe Chen committed
321
    logger.debug(f"Received deploy request for repository: {repository_url} to domain: {domain_name}")
Joe Chen's avatar
ss    
Joe Chen committed
322
323
324
325
326
327
328
    
    try:
        # Validate domain name (alphanumeric and hyphens only, must start with a letter)
        if not re.match(r'^[a-z][a-z0-9\-]*$', domain_name):
            return "Error: Domain name must start with a letter and contain only lowercase letters, numbers, and hyphens."
        
        
Joe Chen's avatar
ss    
Joe Chen committed
329
330
331
332
333
334
335
336
337
338
        # Check if domain is already in use
        connection = get_db_connection()
        try:
            with connection.cursor() as cursor:
                cursor.execute("USE mcp_domains")
                cursor.execute("SELECT * FROM domains WHERE domain_name = %s", (domain_name,))
                existing_domain = cursor.fetchone()
                
                if existing_domain:
                    # Domain exists, check if it's for the same repository
Joe Chen's avatar
ss    
Joe Chen committed
339
                    if existing_domain['github_url'] == repository_url:
Joe Chen's avatar
ss    
Joe Chen committed
340
341
342
343
344
345
346
347
348
                        logger.info(f"Domain {domain_name} already exists for this repository. Redeploying...")
                    else:
                        return f"Error: The domain {domain_name}.smartzliving.ai is already in use with a different repository."
        except Exception as db_error:
            logger.error(f"Database error checking domain: {str(db_error)}")
            return f"Error checking domain availability: {str(db_error)}"
        finally:
            connection.close()
        
Joe Chen's avatar
ss    
Joe Chen committed
349
350
        # Check if credentials are needed (for private repositories)
        # Modify the URL to include credentials if provided
Joe Chen's avatar
ss    
Joe Chen committed
351
        clone_url = repository_url
Joe Chen's avatar
ss    
Joe Chen committed
352

Joe Chen's avatar
ss    
Joe Chen committed
353
354
355
356
357
        
        # Generate a unique ID for this deployment
        deployment_id = str(uuid.uuid4())
        
        # Define paths
Joe Chen's avatar
ss    
Joe Chen committed
358
359
        domain_path = f"/home/work/deploy/{domain_name}"
        code_path = f"/home/work/code/{deployment_id}"
Joe Chen's avatar
ss    
Joe Chen committed
360
361
362
363
364
365
366
367
        
        # Create directories
        os.makedirs(domain_path, exist_ok=True)
        os.makedirs(code_path, exist_ok=True)
        
        logger.debug(f"Created directories: {domain_path} and {code_path}")
        
        # Clone repository
Joe Chen's avatar
ss    
Joe Chen committed
368
        logger.debug(f"Cloning repository: {repository_url}")
Joe Chen's avatar
ss    
Joe Chen committed
369
370
        # Don't log the clone_url as it might contain credentials
        
Joe Chen's avatar
ss    
Joe Chen committed
371
372
373
374
375

        if clone_url.startswith("https://"):
            clone_url = clone_url.split("https://")[1]

        if token and username:
Joe Chen's avatar
ss    
Joe Chen committed
376
            username = username.replace("@", "%40")
Joe Chen's avatar
ss    
Joe Chen committed
377
378
            clone_url = f"https://{username}:{token}@{clone_url}"

Joe Chen's avatar
ss    
Joe Chen committed
379
380
381
382
383
        print(clone_url)

        print("full clone command is:")
        print(f"su work -c 'git clone {clone_url} {code_path}'")

Joe Chen's avatar
ss    
Joe Chen committed
384
385
        # Clone the repository
        result = subprocess.run(
Joe Chen's avatar
ss    
Joe Chen committed
386
            [f"su work -c 'cd {code_path} && git clone {clone_url}'"],
Joe Chen's avatar
ss    
Joe Chen committed
387
388
389
            capture_output=True,
            text=True
        )
Joe Chen's avatar
ss    
Joe Chen committed
390
391


Joe Chen's avatar
ss    
Joe Chen committed
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
        
        if result.returncode != 0:
            error_msg = f"Failed to clone repository: {result.stderr}"
            logger.error(error_msg)
            # Clean up failed deployment
            shutil.rmtree(code_path, ignore_errors=True)
            return error_msg
        
        # Install dependencies and build
        logger.debug("Installing dependencies and building project")
        build_commands = [
            f"cd {code_path}",
            "npm install",
            "npm run build"
        ]
        
        result = subprocess.run(
Joe Chen's avatar
ss    
Joe Chen committed
409
            [build_commands],
Joe Chen's avatar
ss    
Joe Chen committed
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
            capture_output=True,
            text=True
        )
        
        if result.returncode != 0:
            error_msg = f"Failed to build project: {result.stderr}"
            logger.error(error_msg)
            # Clean up failed deployment
            shutil.rmtree(code_path, ignore_errors=True)
            return error_msg
        
        # Check if build directory exists
        build_dir = os.path.join(code_path, "build")
        if not os.path.exists(build_dir):
            build_dir = os.path.join(code_path, "dist")  # Try alternative build folder name
            
        if not os.path.exists(build_dir):
            error_msg = "Build directory not found. Make sure the project has a standard build setup."
            logger.error(error_msg)
            # Clean up failed deployment
            shutil.rmtree(code_path, ignore_errors=True)
            return error_msg
        
        # Clear existing domain files
        for item in os.listdir(domain_path):
            item_path = os.path.join(domain_path, item)
            if os.path.isfile(item_path):
                os.unlink(item_path)
            else:
                shutil.rmtree(item_path)
        
        # Copy build files to domain directory
        for item in os.listdir(build_dir):
            source = os.path.join(build_dir, item)
            destination = os.path.join(domain_path, item)
            if os.path.isdir(source):
                shutil.copytree(source, destination)
            else:
                shutil.copy2(source, destination)
        
        
        # Clean up code directory
        shutil.rmtree(code_path, ignore_errors=True)
Joe Chen's avatar
ss    
Joe Chen committed
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495



        url = "https://api.cloudflare.com/client/v4/zones/6183d711284073733db288ac2de24360/dns_records"

        if domain_name.startswith("https://"):
            domain_name = domain_name.split("https://")[1]

        if domain_name.startswith("http://"):
            domain_name = domain_name.split("http://")[1]
            
            

        payload = json.dumps({
            "comment": f"joe mcp {domain_name} test",
            "content": "50.226.99.215",
            "name": f"{domain_name}.smartzliving.ai",
            "proxied": True,
            "ttl": 3600,
            "type": "A"
        })
        headers = {
            'X-Auth-Email': 'penneysun@meshare.cn',
            'Content-Type': 'application/json',
            'Authorization': 'Bearer dM2Pml3CCOoMkJT82CX6tvnNGtIkIf4EN5Ymui08'
        }

        response = requests.request("POST", url, headers=headers, data=payload)

        if response.status_code != 200:
            logger.error(f"Error setting up Cloudflare DNS: {response.text}")
            return f"Error setting up Cloudflare DNS: {response.text}"

        
        # Update the database
        connection = get_db_connection()
        try:
            with connection.cursor() as cursor:
                cursor.execute("USE mcp_domains")
                if existing_domain:
                    # Update the existing entry
                    cursor.execute(
                        "UPDATE domains SET github_url = %s WHERE domain_name = %s",
Joe Chen's avatar
ss    
Joe Chen committed
496
                        (repository_url, domain_name)
Joe Chen's avatar
ss    
Joe Chen committed
497
498
499
500
501
                    )
                else:
                    # Insert a new entry
                    cursor.execute(
                        "INSERT INTO domains (domain_name, github_url) VALUES (%s, %s)",
Joe Chen's avatar
ss    
Joe Chen committed
502
                        (domain_name, repository_url)
Joe Chen's avatar
ss    
Joe Chen committed
503
504
505
506
507
508
                    )
            connection.commit()
        except Exception as db_error:
            logger.error(f"Database error updating domain record: {str(db_error)}")
        finally:
            connection.close()
Joe Chen's avatar
ss    
Joe Chen committed
509
510
511
512
513
514
515
516
517
518
519
520
        
        success_msg = f"Site successfully deployed to https://{domain_name}.smartzliving.ai"
        logger.info(success_msg)
        
        # Send a log message notification to the client
        ctx.request_context.session.send_log_message(
            level="info",
            data=success_msg
        )
        
        return success_msg
    except Exception as e:
Joe Chen's avatar
ss    
Joe Chen committed
521
522
        import traceback
        traceback.print_exc()
Joe Chen's avatar
ss    
Joe Chen committed
523
524
525
526
527
528
529
530
531
532
533
        error_msg = f"Error deploying site: {str(e)}"
        logger.exception(error_msg)
        
        # Send an error log message notification to the client
        ctx.request_context.session.send_log_message(
            level="error",
            data=f"Deployment failed: {str(e)}"
        )
        
        return error_msg

Joe Chen's avatar
ss    
Joe Chen committed
534

Joe Chen's avatar
Joe Chen committed
535
536
537
538
539
540
541
542
async def main():
    
        # Run the MCP server with sse transport
    await mcp.run_sse_async()


if __name__ == "__main__":
    asyncio.run(main())