""" Generate synthetic data for Device Management Lambda """ import datetime import random import uuid from decimal import Decimal import boto3 import os import json # Configure DynamoDB connection # Always use AWS DynamoDB in us-west-2 aws_region = 'us-west-2' # Initialize DynamoDB resource dynamodb = boto3.resource('dynamodb', region_name=aws_region) # Define table names DEVICES_TABLE = 'Devices' DEVICE_SETTINGS_TABLE = 'DeviceSettings' WIFI_NETWORKS_TABLE = 'WifiNetworks' USERS_TABLE = 'Users' USER_ACTIVITIES_TABLE = 'UserActivities' # Helper function to convert datetime to ISO format string def datetime_to_iso(dt): if isinstance(dt, datetime.datetime): return dt.isoformat() return dt # Device CRUD operations def create_device(device_data): """Create a new device""" table = dynamodb.Table(DEVICES_TABLE) # Ensure device_id exists if 'device_id' not in device_data: device_data['device_id'] = f"DG-{uuid.uuid4().hex[:8].upper()}" # Convert datetime objects to ISO format strings if 'last_connected' in device_data and device_data['last_connected']: device_data['last_connected'] = datetime_to_iso(device_data['last_connected']) response = table.put_item(Item=device_data) return device_data # Device Settings CRUD operations def create_device_setting(device_id, setting_key, setting_value, last_updated=None): """Create or update a device setting""" table = dynamodb.Table(DEVICE_SETTINGS_TABLE) if not last_updated: last_updated = datetime.datetime.utcnow() item = { 'device_id': device_id, 'setting_key': setting_key, 'setting_value': setting_value, 'last_updated': datetime_to_iso(last_updated) } response = table.put_item(Item=item) return item # WiFi Network CRUD operations def create_wifi_network(network_data): """Create a new WiFi network""" table = dynamodb.Table(WIFI_NETWORKS_TABLE) # Ensure required fields exist if 'device_id' not in network_data: raise ValueError("device_id is required") if 'network_id' not in network_data: network_data['network_id'] = f"wifi_{uuid.uuid4().hex[:8]}" # Convert datetime objects to ISO format strings if 'last_updated' in network_data and network_data['last_updated']: network_data['last_updated'] = datetime_to_iso(network_data['last_updated']) else: network_data['last_updated'] = datetime_to_iso(datetime.datetime.utcnow()) # Convert float to Decimal for DynamoDB if 'signal_strength' in network_data and network_data['signal_strength'] is not None: network_data['signal_strength'] = Decimal(str(network_data['signal_strength'])) response = table.put_item(Item=network_data) return network_data # User CRUD operations def create_user(user_data): """Create a new user""" table = dynamodb.Table(USERS_TABLE) # Ensure user_id exists if 'user_id' not in user_data: user_data['user_id'] = f"USR{uuid.uuid4().hex[:8].upper()}" # Convert datetime objects to ISO format strings if 'created_at' in user_data and user_data['created_at']: user_data['created_at'] = datetime_to_iso(user_data['created_at']) else: user_data['created_at'] = datetime_to_iso(datetime.datetime.utcnow()) if 'last_login' in user_data and user_data['last_login']: user_data['last_login'] = datetime_to_iso(user_data['last_login']) response = table.put_item(Item=user_data) return user_data # User Activity CRUD operations def create_user_activity(user_id, activity_type, description=None, ip_address=None, timestamp=None): """Create a new user activity""" table = dynamodb.Table(USER_ACTIVITIES_TABLE) if not timestamp: timestamp = datetime.datetime.utcnow() timestamp_str = datetime_to_iso(timestamp) item = { 'user_id': user_id, 'timestamp': timestamp_str, 'activity_type': activity_type, 'description': description, 'ip_address': ip_address } response = table.put_item(Item=item) return item def generate_synthetic_data(): """Generate synthetic data for testing with DynamoDB""" # Import and initialize database from dynamodb_models import init_db init_db() # Generate devices (at least 25) devices = [] for i in range(1, 26): device_id = f"DG-{100000+i}" device_data = { 'device_id': device_id, 'name': f"Device Router {i}", 'model': random.choice(["TransPort WR31", "TransPort WR44", "IX20", "EX15", "IX15", "EX12", "WR54", "WR64"]), 'firmware_version': f"5.{random.randint(1, 9)}.{random.randint(1, 20)}", 'connection_status': random.choice(["Connected", "Disconnected", "Dormant", "Maintenance", "Updating"]), 'ip_address': f"192.168.{random.randint(1, 5)}.{random.randint(2, 254)}", 'mac_address': f"00:40:9D:{random.randint(10, 99)}:{random.randint(10, 99)}:{random.randint(10, 99)}", 'last_connected': datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 72)) } device = create_device(device_data) devices.append(device) # Generate device settings (multiple per device) setting_keys = [ "cellular.apn", "cellular.auth_type", "cellular.sim_pin", "network.hostname", "network.ip_mode", "network.dns_primary", "system.contact", "system.location", "system.description", "firewall.enabled", "firewall.default_policy", "wifi.channel", "wifi.country", "wifi.regulatory_domain", "system.timezone", "system.ntp_server", "system.log_level", "network.ipv6_enabled", "network.mtu", "network.gateway", "security.ssh_enabled", "security.https_enabled", "security.password_complexity", "cellular.band_selection", "cellular.roaming_allowed" ] device_settings_count = 0 for device in devices: # Ensure each device has at least one setting for each key for key in setting_keys: if "cellular.apn" in key: value = random.choice(["internet", "broadband", "iot.secure", "m2m.provider.com", "wireless.data"]) elif "auth_type" in key: value = random.choice(["none", "pap", "chap", "auto"]) elif "sim_pin" in key: value = str(random.randint(1000, 9999)) elif "hostname" in key: value = f"device-router-{device['device_id'].lower()}" elif "ip_mode" in key: value = random.choice(["dhcp", "static", "auto"]) elif "dns" in key: value = random.choice(["8.8.8.8", "8.8.4.4", "1.1.1.1", "9.9.9.9", f"192.168.{random.randint(1,5)}.1"]) elif "contact" in key: value = f"admin-{random.randint(1, 10)}@example.com" elif "location" in key: value = random.choice(["Server Room", "Office", "Remote Site", "Branch Office", "Warehouse", "Data Center", "Manufacturing Floor", "Retail Store", "Distribution Center", "Field Site"]) elif "description" in key: value = f"Device Router for {random.choice(['Primary', 'Backup', 'IoT', 'Monitoring', 'Security', 'Guest', 'VPN', 'Emergency'])} connectivity" elif "firewall.enabled" in key: value = random.choice(["true", "false"]) elif "default_policy" in key: value = random.choice(["accept", "drop", "reject"]) elif "wifi.channel" in key: value = str(random.choice([1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161])) elif "wifi.country" in key: value = random.choice(["US", "CA", "GB", "DE", "FR", "JP", "AU", "NZ", "SG", "IN"]) elif "regulatory_domain" in key: value = random.choice(["FCC", "ETSI", "TELEC", "KCC", "SRRC"]) elif "timezone" in key: value = random.choice(["UTC", "America/New_York", "Europe/London", "Asia/Tokyo", "Australia/Sydney", "Pacific/Auckland"]) elif "ntp_server" in key: value = random.choice(["pool.ntp.org", "time.google.com", "time.windows.com", "time.apple.com", "ntp.ubuntu.com"]) elif "log_level" in key: value = random.choice(["debug", "info", "warning", "error", "critical"]) elif "ipv6_enabled" in key: value = random.choice(["true", "false"]) elif "mtu" in key: value = str(random.choice([1500, 1492, 1480, 1450, 1400])) elif "gateway" in key: value = f"192.168.{random.randint(1,5)}.1" elif "ssh_enabled" in key: value = random.choice(["true", "false"]) elif "https_enabled" in key: value = random.choice(["true", "false"]) elif "password_complexity" in key: value = random.choice(["low", "medium", "high", "extreme"]) elif "band_selection" in key: value = random.choice(["auto", "4g_only", "5g_only", "4g_preferred", "5g_preferred"]) elif "roaming_allowed" in key: value = random.choice(["true", "false"]) else: value = f"value-{random.randint(1, 100)}" last_updated = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30)) create_device_setting(device['device_id'], key, value, last_updated) device_settings_count += 1 # Generate WiFi networks (at least 25 total) wifi_count = 0 for device in devices: # Ensure we have at least 25 WiFi networks total networks_per_device = max(1, 25 // len(devices)) if wifi_count < 25 and device['device_id'] == devices[-1]['device_id']: # Add remaining networks to the last device to ensure we have at least 25 networks_per_device = max(networks_per_device, 25 - wifi_count) for i in range(1, networks_per_device + 1): network_data = { 'device_id': device['device_id'], 'network_id': f"wifi_{i}", 'ssid': f"Device-Net-{device['device_id']}-{i}", 'security_type': random.choice(["wpa2-psk", "wpa3-psk", "open", "wpa-psk", "wep", "enterprise"]), 'enabled': random.choice([True, True, False]), # More likely to be enabled 'channel': random.choice([1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161]), 'signal_strength': random.uniform(-90.0, -30.0), 'last_updated': datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30)) } create_wifi_network(network_data) wifi_count += 1 # Generate users (at least 25) users = [] roles = ["admin", "operator", "viewer", "device_admin", "support", "manager", "auditor", "guest", "technician", "developer"] domains = ["example.com", "company.net", "device-test.org", "enterprise.co", "tech-corp.io"] first_names = ["John", "Jane", "Robert", "Lisa", "Michael", "Sarah", "David", "Emma", "Chris", "Olivia", "William", "Sophia", "James", "Ava", "Benjamin", "Mia", "Daniel", "Charlotte", "Matthew", "Amelia", "Joseph", "Harper", "Andrew", "Evelyn", "Joshua", "Abigail", "Nicholas", "Emily", "Ryan", "Elizabeth"] last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Miller", "Davis", "Garcia", "Wilson", "Taylor", "Anderson", "Thomas", "Jackson", "White", "Harris", "Martin", "Thompson", "Moore", "Young", "Allen", "King", "Wright", "Scott", "Green", "Baker", "Adams", "Nelson", "Hill", "Ramirez", "Campbell"] # Create a set to track used email addresses used_emails = set() for i in range(1, 26): # Keep generating until we get a unique email while True: first_name = random.choice(first_names) last_name = random.choice(last_names) domain = random.choice(domains) email = f"{first_name.lower()}.{last_name.lower()}{i}@{domain}" if email not in used_emails: used_emails.add(email) break user_data = { 'user_id': f"USR{100000+i}", 'username': f"{first_name.lower()}.{last_name.lower()}{i}", 'email': email, 'first_name': first_name, 'last_name': last_name, 'role': random.choice(roles), 'created_at': datetime.datetime.now() - datetime.timedelta(days=random.randint(30, 365)), 'last_login': datetime.datetime.now() - datetime.timedelta(hours=random.randint(1, 240)) } user = create_user(user_data) users.append(user) # Generate user activities (at least 25 per user) activity_types = [ "login", "logout", "device_config_change", "firmware_update", "user_added", "user_removed", "report_generated", "alert_acknowledged", "device_reboot", "password_change", "vpn_connected", "vpn_disconnected", "file_upload", "file_download", "settings_export", "settings_import", "group_created", "group_modified", "permission_changed", "api_key_created", "api_key_revoked", "schedule_created", "schedule_modified", "backup_created", "restore_performed", "certificate_installed", "certificate_expired", "security_scan", "remote_access_enabled", "remote_access_disabled" ] user_activities_count = 0 for user in users: # Generate at least 25 activities per user for _ in range(25): activity_type = random.choice(activity_types) if activity_type == "login": description = f"User logged in from {random.choice(['web interface', 'mobile app', 'API', 'CLI', 'desktop application'])}" elif activity_type == "logout": description = "User logged out" elif activity_type == "device_config_change": device = random.choice(devices) setting = random.choice(setting_keys) description = f"Changed {setting} on device {device['name']}" elif activity_type == "firmware_update": device = random.choice(devices) old_version = f"5.{random.randint(1, 5)}.{random.randint(1, 10)}" new_version = f"5.{random.randint(6, 9)}.{random.randint(1, 20)}" description = f"Updated firmware on {device['name']} from {old_version} to {new_version}" elif activity_type == "user_added": description = f"Added new user {random.choice(first_names).lower()}.{random.choice(last_names).lower()}" elif activity_type == "user_removed": description = f"Removed user {random.choice(first_names).lower()}.{random.choice(last_names).lower()}" elif activity_type == "report_generated": report_type = random.choice(["usage", "security", "connectivity", "performance", "compliance", "audit", "inventory"]) description = f"Generated {report_type} report" elif activity_type == "alert_acknowledged": alert_id = f"ALT-{random.randint(10000, 99999)}" description = f"Acknowledged alert {alert_id}" elif activity_type == "device_reboot": device = random.choice(devices) description = f"Initiated reboot of device {device['name']}" elif activity_type == "password_change": description = "Changed account password" elif activity_type == "vpn_connected": description = f"Connected to VPN from IP {random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}" elif activity_type == "vpn_disconnected": description = "Disconnected from VPN" elif activity_type == "file_upload": file_type = random.choice(["configuration", "firmware", "certificate", "log", "backup"]) description = f"Uploaded {file_type} file" elif activity_type == "file_download": file_type = random.choice(["configuration", "report", "log", "backup", "template"]) description = f"Downloaded {file_type} file" elif activity_type == "settings_export": device = random.choice(devices) description = f"Exported settings from device {device['name']}" elif activity_type == "settings_import": device = random.choice(devices) description = f"Imported settings to device {device['name']}" elif activity_type == "group_created": description = f"Created device group '{random.choice(['Production', 'Testing', 'Development', 'Backup', 'Remote', 'Office', 'Warehouse'])}'" elif activity_type == "group_modified": description = f"Modified device group '{random.choice(['Production', 'Testing', 'Development', 'Backup', 'Remote', 'Office', 'Warehouse'])}'" elif activity_type == "permission_changed": target_user = random.choice(users) description = f"Changed permissions for user {target_user['username']}" elif activity_type == "api_key_created": description = "Created new API key" elif activity_type == "api_key_revoked": description = "Revoked API key" elif activity_type == "schedule_created": task = random.choice(["backup", "reboot", "firmware update", "report generation", "maintenance"]) description = f"Created schedule for {task}" elif activity_type == "schedule_modified": task = random.choice(["backup", "reboot", "firmware update", "report generation", "maintenance"]) description = f"Modified schedule for {task}" elif activity_type == "backup_created": device = random.choice(devices) description = f"Created backup of device {device['name']}" elif activity_type == "restore_performed": device = random.choice(devices) description = f"Restored configuration to device {device['name']}" elif activity_type == "certificate_installed": device = random.choice(devices) cert_type = random.choice(["SSL", "SSH", "CA", "client"]) description = f"Installed {cert_type} certificate on device {device['name']}" elif activity_type == "certificate_expired": device = random.choice(devices) cert_type = random.choice(["SSL", "SSH", "CA", "client"]) description = f"{cert_type} certificate expired on device {device['name']}" elif activity_type == "security_scan": device = random.choice(devices) description = f"Performed security scan on device {device['name']}" elif activity_type == "remote_access_enabled": device = random.choice(devices) description = f"Enabled remote access for device {device['name']}" elif activity_type == "remote_access_disabled": device = random.choice(devices) description = f"Disabled remote access for device {device['name']}" else: description = f"Performed {activity_type}" # Generate a timestamp within the last 30 days timestamp = datetime.datetime.now() - datetime.timedelta( days=random.randint(0, 30), hours=random.randint(0, 23), minutes=random.randint(0, 59) ) ip_address = f"{random.randint(10, 203)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}" create_user_activity(user['user_id'], activity_type, description, ip_address, timestamp) user_activities_count += 1 print(f"Synthetic data generated successfully!") print(f"- {len(devices)} devices created") print(f"- {len(users)} users created") print(f"- {wifi_count} WiFi networks created") print(f"- {device_settings_count} device settings created") print(f"- {user_activities_count} user activities created") if __name__ == "__main__": generate_synthetic_data()