""" AWS Lambda function for Device Management MCP Tools Implements all MCP server tools in a single Lambda function """ import json import os import datetime import uuid import logging import boto3 from decimal import Decimal from boto3.dynamodb.conditions import Key, Attr # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # Helper function to convert datetime to ISO format string def datetime_to_iso(dt): if isinstance(dt, datetime.datetime): return dt.isoformat() return dt # Helper function to handle decimal serialization for DynamoDB class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, Decimal): return float(o) return super(DecimalEncoder, self).default(o) def json_dumps(obj): return json.dumps(obj, cls=DecimalEncoder) # Initialize DynamoDB resource def get_dynamodb_resource(): """Get DynamoDB resource based on environment""" # Always use AWS DynamoDB in us-west-2 aws_region = 'us-west-2' return boto3.resource('dynamodb', region_name=aws_region) # Define table names DEVICES_TABLE = 'Devices' DEVICE_SETTINGS_TABLE = 'DeviceSettings' WIFI_NETWORKS_TABLE = 'WifiNetworks' USERS_TABLE = 'Users' USER_ACTIVITIES_TABLE = 'UserActivities' # Device operations def get_device(device_id): """Get a device by ID""" dynamodb = get_dynamodb_resource() table = dynamodb.Table(DEVICES_TABLE) response = table.get_item(Key={'device_id': device_id}) return response.get('Item') def list_devices(limit=100): """List all devices""" dynamodb = get_dynamodb_resource() table = dynamodb.Table(DEVICES_TABLE) response = table.scan(Limit=limit) return response.get('Items', []) # Device Settings operations def get_device_setting(device_id, setting_key): """Get a specific device setting""" dynamodb = get_dynamodb_resource() table = dynamodb.Table(DEVICE_SETTINGS_TABLE) response = table.get_item(Key={ 'device_id': device_id, 'setting_key': setting_key }) return response.get('Item') def list_device_settings(device_id): """List all settings for a device""" dynamodb = get_dynamodb_resource() table = dynamodb.Table(DEVICE_SETTINGS_TABLE) response = table.query( KeyConditionExpression=Key('device_id').eq(device_id) ) return response.get('Items', []) # WiFi Network operations def list_wifi_networks(device_id): """List all WiFi networks for a device""" dynamodb = get_dynamodb_resource() table = dynamodb.Table(WIFI_NETWORKS_TABLE) response = table.query( KeyConditionExpression=Key('device_id').eq(device_id) ) return response.get('Items', []) def update_wifi_network(device_id, network_id, update_data): """Update a WiFi network""" dynamodb = get_dynamodb_resource() table = dynamodb.Table(WIFI_NETWORKS_TABLE) # Convert datetime objects to ISO format strings if 'last_updated' in update_data and update_data['last_updated']: update_data['last_updated'] = datetime_to_iso(update_data['last_updated']) else: update_data['last_updated'] = datetime_to_iso(datetime.datetime.utcnow()) # Convert float to Decimal for DynamoDB if 'signal_strength' in update_data and update_data['signal_strength'] is not None: update_data['signal_strength'] = Decimal(str(update_data['signal_strength'])) # Build update expression update_expression = "SET " expression_attribute_values = {} expression_attribute_names = {} for key, value in update_data.items(): if key not in ['device_id', 'network_id']: # Skip the primary keys update_expression += f"#{key} = :{key}, " expression_attribute_values[f":{key}"] = value expression_attribute_names[f"#{key}"] = key # Remove trailing comma and space update_expression = update_expression[:-2] response = table.update_item( Key={'device_id': device_id, 'network_id': network_id}, UpdateExpression=update_expression, ExpressionAttributeValues=expression_attribute_values, ExpressionAttributeNames=expression_attribute_names, ReturnValues="ALL_NEW" ) return response.get('Attributes') def update_wifi_ssid(device_id, network_id, ssid): """Update the SSID of a WiFi network""" return update_wifi_network(device_id, network_id, {'ssid': ssid}) def update_wifi_security(device_id, network_id, security_type): """Update the security type of a WiFi network""" return update_wifi_network(device_id, network_id, {'security_type': security_type}) # User operations def list_users(limit=100): """List all users""" dynamodb = get_dynamodb_resource() table = dynamodb.Table(USERS_TABLE) response = table.scan(Limit=limit) return response.get('Items', []) # User Activity operations def query_user_activity(start_date, end_date, user_id=None, activity_type=None, limit=100): """Query user activities within a time period""" dynamodb = get_dynamodb_resource() table = dynamodb.Table(USER_ACTIVITIES_TABLE) # Convert datetime objects to ISO format strings if isinstance(start_date, datetime.datetime): start_date = datetime_to_iso(start_date) if isinstance(end_date, datetime.datetime): end_date = datetime_to_iso(end_date) if user_id: # Query by user_id and time range key_condition = Key('user_id').eq(user_id) & Key('timestamp').between(start_date, end_date) filter_expression = None if activity_type: filter_expression = Attr('activity_type').eq(activity_type) if filter_expression: response = table.query( KeyConditionExpression=key_condition, FilterExpression=filter_expression, Limit=limit ) else: response = table.query( KeyConditionExpression=key_condition, Limit=limit ) elif activity_type: # Query by activity_type and time range using GSI response = table.query( IndexName='ActivityTypeIndex', KeyConditionExpression=Key('activity_type').eq(activity_type) & Key('timestamp').between(start_date, end_date), Limit=limit ) else: # Scan with time range filter response = table.scan( FilterExpression=Attr('timestamp').between(start_date, end_date), Limit=limit ) return response.get('Items', []) # MCP Tool implementations def tool_get_device_settings(device_id): """ Get the settings of a device from the Device API Args: device_id: The ID of the device to get settings for Returns: Device settings information """ try: device = get_device(device_id) if not device: return {"error": f"Device not found: {device_id}"} settings = list_device_settings(device_id) result = { "device_id": device["device_id"], "device_name": device["name"], "model": device["model"], "firmware_version": device["firmware_version"], "connection_status": device["connection_status"], "settings": {} } for setting in settings: result["settings"][setting["setting_key"]] = setting["setting_value"] return result except Exception as e: logger.error(f"Error in get_device_settings: {str(e)}") return {"error": str(e)} def tool_list_devices(limit=25): """ List devices in the Device Remote Management system Args: limit: Maximum number of devices to return (default: 25) Returns: List of devices with their details """ try: devices = list_devices(limit) return devices except Exception as e: logger.error(f"Error in list_devices: {str(e)}") return {"error": str(e)} def tool_list_wifi_networks(device_id): """ List all WiFi networks for a specific device Args: device_id: The ID of the device to get WiFi networks for Returns: List of WiFi networks with their details """ try: device = get_device(device_id) if not device: return {"error": f"Device not found: {device_id}"} networks = list_wifi_networks(device_id) return { "device_id": device["device_id"], "device_name": device["name"], "wifi_networks": networks } except Exception as e: logger.error(f"Error in list_wifi_networks: {str(e)}") return {"error": str(e)} def tool_list_users(limit=100): """ List users within an account from the Device API Args: limit: Maximum number of users to return (default: 100) Returns: List of users """ try: users = list_users(limit) return users except Exception as e: logger.error(f"Error in list_users: {str(e)}") return {"error": str(e)} def tool_query_user_activity(start_date, end_date, user_id=None, activity_type=None, limit=100): """ Query user activity within a time period Args: start_date: Start date in ISO format (YYYY-MM-DDTHH:MM:SS) end_date: End date in ISO format (YYYY-MM-DDTHH:MM:SS) user_id: Optional user ID to filter activities activity_type: Optional activity type to filter limit: Maximum number of activities to return (default: 100) Returns: List of user activities """ try: activities = query_user_activity(start_date, end_date, user_id, activity_type, limit) return activities except Exception as e: logger.error(f"Error in query_user_activity: {str(e)}") return {"error": str(e)} def tool_update_wifi_ssid(device_id, network_id, ssid): """ Update the SSID of a Wi-Fi network on a device Args: device_id: The ID of the device network_id: The ID of the Wi-Fi network ssid: The new SSID for the Wi-Fi network Returns: Result of the SSID update operation """ try: # Validate SSID length (1-32 characters) if len(ssid) < 1 or len(ssid) > 32: return {"error": "SSID must be between 1 and 32 characters"} result = update_wifi_ssid(device_id, network_id, ssid) return result except Exception as e: logger.error(f"Error in update_wifi_ssid: {str(e)}") return {"error": str(e)} def tool_update_wifi_security(device_id, network_id, security_type): """ Update the security type of a Wi-Fi network on a device Args: device_id: The ID of the device network_id: The ID of the Wi-Fi network security_type: The new security type for the Wi-Fi network (wpa2-psk, wpa3-psk, open, wpa-psk, wep, enterprise) Returns: Result of the security type update operation """ try: # Validate security type valid_security_types = ["wpa2-psk", "wpa3-psk", "open", "wpa-psk", "wep", "enterprise"] if security_type not in valid_security_types: return {"error": f"Invalid security type. Must be one of: {', '.join(valid_security_types)}"} result = update_wifi_security(device_id, network_id, security_type) return result except Exception as e: logger.error(f"Error in update_wifi_security: {str(e)}") return {"error": str(e)} # Lambda handler def lambda_handler(event, context): """ AWS Lambda handler function Args: event: Lambda event data context: Lambda context Returns: Lambda response """ try: # Parse the incoming request logger.info(f"Received event: {json.dumps(event)}") # Extract the tool name from the event tool_name = event['action_name'] result = None # Call the appropriate function based on tool_name if tool_name == 'get_device_settings': device_id = event['device_id'] result = tool_get_device_settings(device_id) elif tool_name == 'list_devices': limit = event.get('limit', 25) result = tool_list_devices(limit) elif tool_name == 'list_wifi_networks': device_id = event['device_id'] result = tool_list_wifi_networks(device_id) elif tool_name == 'list_users': limit = event.get('limit', 100) result = tool_list_users(limit) elif tool_name == 'query_user_activity': start_date = event['start_date'] end_date = event['end_date'] user_id = event.get('user_id') activity_type = event.get('activity_type') limit = event.get('limit', 50) result = tool_query_user_activity(start_date, end_date, user_id, activity_type, limit) elif tool_name == 'update_wifi_ssid': device_id = event['device_id'] network_id = event['network_id'] ssid = event['ssid'] result = tool_update_wifi_ssid(device_id, network_id, ssid) elif tool_name == 'update_wifi_security': device_id = event['device_id'] network_id = event['network_id'] security_type = event['security_type'] result = tool_update_wifi_security(device_id, network_id, security_type) else: available_tools = [ 'get_device_settings', 'list_devices', 'list_wifi_networks', 'list_users', 'query_user_activity', 'update_wifi_ssid', 'update_wifi_security' ] return { 'statusCode': 400, 'body': json.dumps({ 'error': f"Unknown tool: {tool_name}", 'available_tools': available_tools }) } # Return the result return { 'statusCode': 200, 'body': json_dumps(result) } except Exception as e: logger.error(f"Error processing request: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': f"Internal server error: {str(e)}" }) }