421 lines
20 KiB
Python
Raw Permalink Normal View History

"""
Generate synthetic data for Device Management Lambda
"""
import datetime
import random
import uuid
from decimal import Decimal
import boto3
import os
import json
# Configure DynamoDB connection
# Always use AWS DynamoDB in us-west-2
aws_region = 'us-west-2'
# Initialize DynamoDB resource
dynamodb = boto3.resource('dynamodb', region_name=aws_region)
# Define table names
DEVICES_TABLE = 'Devices'
DEVICE_SETTINGS_TABLE = 'DeviceSettings'
WIFI_NETWORKS_TABLE = 'WifiNetworks'
USERS_TABLE = 'Users'
USER_ACTIVITIES_TABLE = 'UserActivities'
# Helper function to convert datetime to ISO format string
def datetime_to_iso(dt):
if isinstance(dt, datetime.datetime):
return dt.isoformat()
return dt
# Device CRUD operations
def create_device(device_data):
"""Create a new device"""
table = dynamodb.Table(DEVICES_TABLE)
# Ensure device_id exists
if 'device_id' not in device_data:
device_data['device_id'] = f"DG-{uuid.uuid4().hex[:8].upper()}"
# Convert datetime objects to ISO format strings
if 'last_connected' in device_data and device_data['last_connected']:
device_data['last_connected'] = datetime_to_iso(device_data['last_connected'])
response = table.put_item(Item=device_data)
return device_data
# Device Settings CRUD operations
def create_device_setting(device_id, setting_key, setting_value, last_updated=None):
"""Create or update a device setting"""
table = dynamodb.Table(DEVICE_SETTINGS_TABLE)
if not last_updated:
last_updated = datetime.datetime.utcnow()
item = {
'device_id': device_id,
'setting_key': setting_key,
'setting_value': setting_value,
'last_updated': datetime_to_iso(last_updated)
}
response = table.put_item(Item=item)
return item
# WiFi Network CRUD operations
def create_wifi_network(network_data):
"""Create a new WiFi network"""
table = dynamodb.Table(WIFI_NETWORKS_TABLE)
# Ensure required fields exist
if 'device_id' not in network_data:
raise ValueError("device_id is required")
if 'network_id' not in network_data:
network_data['network_id'] = f"wifi_{uuid.uuid4().hex[:8]}"
# Convert datetime objects to ISO format strings
if 'last_updated' in network_data and network_data['last_updated']:
network_data['last_updated'] = datetime_to_iso(network_data['last_updated'])
else:
network_data['last_updated'] = datetime_to_iso(datetime.datetime.utcnow())
# Convert float to Decimal for DynamoDB
if 'signal_strength' in network_data and network_data['signal_strength'] is not None:
network_data['signal_strength'] = Decimal(str(network_data['signal_strength']))
response = table.put_item(Item=network_data)
return network_data
# User CRUD operations
def create_user(user_data):
"""Create a new user"""
table = dynamodb.Table(USERS_TABLE)
# Ensure user_id exists
if 'user_id' not in user_data:
user_data['user_id'] = f"USR{uuid.uuid4().hex[:8].upper()}"
# Convert datetime objects to ISO format strings
if 'created_at' in user_data and user_data['created_at']:
user_data['created_at'] = datetime_to_iso(user_data['created_at'])
else:
user_data['created_at'] = datetime_to_iso(datetime.datetime.utcnow())
if 'last_login' in user_data and user_data['last_login']:
user_data['last_login'] = datetime_to_iso(user_data['last_login'])
response = table.put_item(Item=user_data)
return user_data
# User Activity CRUD operations
def create_user_activity(user_id, activity_type, description=None, ip_address=None, timestamp=None):
"""Create a new user activity"""
table = dynamodb.Table(USER_ACTIVITIES_TABLE)
if not timestamp:
timestamp = datetime.datetime.utcnow()
timestamp_str = datetime_to_iso(timestamp)
item = {
'user_id': user_id,
'timestamp': timestamp_str,
'activity_type': activity_type,
'description': description,
'ip_address': ip_address
}
response = table.put_item(Item=item)
return item
def generate_synthetic_data():
"""Generate synthetic data for testing with DynamoDB"""
# Import and initialize database
from dynamodb_models import init_db
init_db()
# Generate devices (at least 25)
devices = []
for i in range(1, 26):
device_id = f"DG-{100000+i}"
device_data = {
'device_id': device_id,
'name': f"Device Router {i}",
'model': random.choice(["TransPort WR31", "TransPort WR44", "IX20", "EX15", "IX15", "EX12", "WR54", "WR64"]),
'firmware_version': f"5.{random.randint(1, 9)}.{random.randint(1, 20)}",
'connection_status': random.choice(["Connected", "Disconnected", "Dormant", "Maintenance", "Updating"]),
'ip_address': f"192.168.{random.randint(1, 5)}.{random.randint(2, 254)}",
'mac_address': f"00:40:9D:{random.randint(10, 99)}:{random.randint(10, 99)}:{random.randint(10, 99)}",
'last_connected': datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 72))
}
device = create_device(device_data)
devices.append(device)
# Generate device settings (multiple per device)
setting_keys = [
"cellular.apn", "cellular.auth_type", "cellular.sim_pin",
"network.hostname", "network.ip_mode", "network.dns_primary",
"system.contact", "system.location", "system.description",
"firewall.enabled", "firewall.default_policy",
"wifi.channel", "wifi.country", "wifi.regulatory_domain",
"system.timezone", "system.ntp_server", "system.log_level",
"network.ipv6_enabled", "network.mtu", "network.gateway",
"security.ssh_enabled", "security.https_enabled", "security.password_complexity",
"cellular.band_selection", "cellular.roaming_allowed"
]
device_settings_count = 0
for device in devices:
# Ensure each device has at least one setting for each key
for key in setting_keys:
if "cellular.apn" in key:
value = random.choice(["internet", "broadband", "iot.secure", "m2m.provider.com", "wireless.data"])
elif "auth_type" in key:
value = random.choice(["none", "pap", "chap", "auto"])
elif "sim_pin" in key:
value = str(random.randint(1000, 9999))
elif "hostname" in key:
value = f"device-router-{device['device_id'].lower()}"
elif "ip_mode" in key:
value = random.choice(["dhcp", "static", "auto"])
elif "dns" in key:
value = random.choice(["8.8.8.8", "8.8.4.4", "1.1.1.1", "9.9.9.9", f"192.168.{random.randint(1,5)}.1"])
elif "contact" in key:
value = f"admin-{random.randint(1, 10)}@example.com"
elif "location" in key:
value = random.choice(["Server Room", "Office", "Remote Site", "Branch Office", "Warehouse",
"Data Center", "Manufacturing Floor", "Retail Store", "Distribution Center", "Field Site"])
elif "description" in key:
value = f"Device Router for {random.choice(['Primary', 'Backup', 'IoT', 'Monitoring', 'Security', 'Guest', 'VPN', 'Emergency'])} connectivity"
elif "firewall.enabled" in key:
value = random.choice(["true", "false"])
elif "default_policy" in key:
value = random.choice(["accept", "drop", "reject"])
elif "wifi.channel" in key:
value = str(random.choice([1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161]))
elif "wifi.country" in key:
value = random.choice(["US", "CA", "GB", "DE", "FR", "JP", "AU", "NZ", "SG", "IN"])
elif "regulatory_domain" in key:
value = random.choice(["FCC", "ETSI", "TELEC", "KCC", "SRRC"])
elif "timezone" in key:
value = random.choice(["UTC", "America/New_York", "Europe/London", "Asia/Tokyo", "Australia/Sydney", "Pacific/Auckland"])
elif "ntp_server" in key:
value = random.choice(["pool.ntp.org", "time.google.com", "time.windows.com", "time.apple.com", "ntp.ubuntu.com"])
elif "log_level" in key:
value = random.choice(["debug", "info", "warning", "error", "critical"])
elif "ipv6_enabled" in key:
value = random.choice(["true", "false"])
elif "mtu" in key:
value = str(random.choice([1500, 1492, 1480, 1450, 1400]))
elif "gateway" in key:
value = f"192.168.{random.randint(1,5)}.1"
elif "ssh_enabled" in key:
value = random.choice(["true", "false"])
elif "https_enabled" in key:
value = random.choice(["true", "false"])
elif "password_complexity" in key:
value = random.choice(["low", "medium", "high", "extreme"])
elif "band_selection" in key:
value = random.choice(["auto", "4g_only", "5g_only", "4g_preferred", "5g_preferred"])
elif "roaming_allowed" in key:
value = random.choice(["true", "false"])
else:
value = f"value-{random.randint(1, 100)}"
last_updated = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))
create_device_setting(device['device_id'], key, value, last_updated)
device_settings_count += 1
# Generate WiFi networks (at least 25 total)
wifi_count = 0
for device in devices:
# Ensure we have at least 25 WiFi networks total
networks_per_device = max(1, 25 // len(devices))
if wifi_count < 25 and device['device_id'] == devices[-1]['device_id']:
# Add remaining networks to the last device to ensure we have at least 25
networks_per_device = max(networks_per_device, 25 - wifi_count)
for i in range(1, networks_per_device + 1):
network_data = {
'device_id': device['device_id'],
'network_id': f"wifi_{i}",
'ssid': f"Device-Net-{device['device_id']}-{i}",
'security_type': random.choice(["wpa2-psk", "wpa3-psk", "open", "wpa-psk", "wep", "enterprise"]),
'enabled': random.choice([True, True, False]), # More likely to be enabled
'channel': random.choice([1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161]),
'signal_strength': random.uniform(-90.0, -30.0),
'last_updated': datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))
}
create_wifi_network(network_data)
wifi_count += 1
# Generate users (at least 25)
users = []
roles = ["admin", "operator", "viewer", "device_admin", "support", "manager", "auditor", "guest", "technician", "developer"]
domains = ["example.com", "company.net", "device-test.org", "enterprise.co", "tech-corp.io"]
first_names = ["John", "Jane", "Robert", "Lisa", "Michael", "Sarah", "David", "Emma", "Chris", "Olivia",
"William", "Sophia", "James", "Ava", "Benjamin", "Mia", "Daniel", "Charlotte", "Matthew", "Amelia",
"Joseph", "Harper", "Andrew", "Evelyn", "Joshua", "Abigail", "Nicholas", "Emily", "Ryan", "Elizabeth"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Miller", "Davis", "Garcia", "Wilson", "Taylor",
"Anderson", "Thomas", "Jackson", "White", "Harris", "Martin", "Thompson", "Moore", "Young", "Allen",
"King", "Wright", "Scott", "Green", "Baker", "Adams", "Nelson", "Hill", "Ramirez", "Campbell"]
# Create a set to track used email addresses
used_emails = set()
for i in range(1, 26):
# Keep generating until we get a unique email
while True:
first_name = random.choice(first_names)
last_name = random.choice(last_names)
domain = random.choice(domains)
email = f"{first_name.lower()}.{last_name.lower()}{i}@{domain}"
if email not in used_emails:
used_emails.add(email)
break
user_data = {
'user_id': f"USR{100000+i}",
'username': f"{first_name.lower()}.{last_name.lower()}{i}",
'email': email,
'first_name': first_name,
'last_name': last_name,
'role': random.choice(roles),
'created_at': datetime.datetime.now() - datetime.timedelta(days=random.randint(30, 365)),
'last_login': datetime.datetime.now() - datetime.timedelta(hours=random.randint(1, 240))
}
user = create_user(user_data)
users.append(user)
# Generate user activities (at least 25 per user)
activity_types = [
"login", "logout", "device_config_change", "firmware_update",
"user_added", "user_removed", "report_generated", "alert_acknowledged",
"device_reboot", "password_change", "vpn_connected", "vpn_disconnected",
"file_upload", "file_download", "settings_export", "settings_import",
"group_created", "group_modified", "permission_changed", "api_key_created",
"api_key_revoked", "schedule_created", "schedule_modified", "backup_created",
"restore_performed", "certificate_installed", "certificate_expired", "security_scan",
"remote_access_enabled", "remote_access_disabled"
]
user_activities_count = 0
for user in users:
# Generate at least 25 activities per user
for _ in range(25):
activity_type = random.choice(activity_types)
if activity_type == "login":
description = f"User logged in from {random.choice(['web interface', 'mobile app', 'API', 'CLI', 'desktop application'])}"
elif activity_type == "logout":
description = "User logged out"
elif activity_type == "device_config_change":
device = random.choice(devices)
setting = random.choice(setting_keys)
description = f"Changed {setting} on device {device['name']}"
elif activity_type == "firmware_update":
device = random.choice(devices)
old_version = f"5.{random.randint(1, 5)}.{random.randint(1, 10)}"
new_version = f"5.{random.randint(6, 9)}.{random.randint(1, 20)}"
description = f"Updated firmware on {device['name']} from {old_version} to {new_version}"
elif activity_type == "user_added":
description = f"Added new user {random.choice(first_names).lower()}.{random.choice(last_names).lower()}"
elif activity_type == "user_removed":
description = f"Removed user {random.choice(first_names).lower()}.{random.choice(last_names).lower()}"
elif activity_type == "report_generated":
report_type = random.choice(["usage", "security", "connectivity", "performance", "compliance", "audit", "inventory"])
description = f"Generated {report_type} report"
elif activity_type == "alert_acknowledged":
alert_id = f"ALT-{random.randint(10000, 99999)}"
description = f"Acknowledged alert {alert_id}"
elif activity_type == "device_reboot":
device = random.choice(devices)
description = f"Initiated reboot of device {device['name']}"
elif activity_type == "password_change":
description = "Changed account password"
elif activity_type == "vpn_connected":
description = f"Connected to VPN from IP {random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}"
elif activity_type == "vpn_disconnected":
description = "Disconnected from VPN"
elif activity_type == "file_upload":
file_type = random.choice(["configuration", "firmware", "certificate", "log", "backup"])
description = f"Uploaded {file_type} file"
elif activity_type == "file_download":
file_type = random.choice(["configuration", "report", "log", "backup", "template"])
description = f"Downloaded {file_type} file"
elif activity_type == "settings_export":
device = random.choice(devices)
description = f"Exported settings from device {device['name']}"
elif activity_type == "settings_import":
device = random.choice(devices)
description = f"Imported settings to device {device['name']}"
elif activity_type == "group_created":
description = f"Created device group '{random.choice(['Production', 'Testing', 'Development', 'Backup', 'Remote', 'Office', 'Warehouse'])}'"
elif activity_type == "group_modified":
description = f"Modified device group '{random.choice(['Production', 'Testing', 'Development', 'Backup', 'Remote', 'Office', 'Warehouse'])}'"
elif activity_type == "permission_changed":
target_user = random.choice(users)
description = f"Changed permissions for user {target_user['username']}"
elif activity_type == "api_key_created":
description = "Created new API key"
elif activity_type == "api_key_revoked":
description = "Revoked API key"
elif activity_type == "schedule_created":
task = random.choice(["backup", "reboot", "firmware update", "report generation", "maintenance"])
description = f"Created schedule for {task}"
elif activity_type == "schedule_modified":
task = random.choice(["backup", "reboot", "firmware update", "report generation", "maintenance"])
description = f"Modified schedule for {task}"
elif activity_type == "backup_created":
device = random.choice(devices)
description = f"Created backup of device {device['name']}"
elif activity_type == "restore_performed":
device = random.choice(devices)
description = f"Restored configuration to device {device['name']}"
elif activity_type == "certificate_installed":
device = random.choice(devices)
cert_type = random.choice(["SSL", "SSH", "CA", "client"])
description = f"Installed {cert_type} certificate on device {device['name']}"
elif activity_type == "certificate_expired":
device = random.choice(devices)
cert_type = random.choice(["SSL", "SSH", "CA", "client"])
description = f"{cert_type} certificate expired on device {device['name']}"
elif activity_type == "security_scan":
device = random.choice(devices)
description = f"Performed security scan on device {device['name']}"
elif activity_type == "remote_access_enabled":
device = random.choice(devices)
description = f"Enabled remote access for device {device['name']}"
elif activity_type == "remote_access_disabled":
device = random.choice(devices)
description = f"Disabled remote access for device {device['name']}"
else:
description = f"Performed {activity_type}"
# Generate a timestamp within the last 30 days
timestamp = datetime.datetime.now() - datetime.timedelta(
days=random.randint(0, 30),
hours=random.randint(0, 23),
minutes=random.randint(0, 59)
)
ip_address = f"{random.randint(10, 203)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
create_user_activity(user['user_id'], activity_type, description, ip_address, timestamp)
user_activities_count += 1
print(f"Synthetic data generated successfully!")
print(f"- {len(devices)} devices created")
print(f"- {len(users)} users created")
print(f"- {wifi_count} WiFi networks created")
print(f"- {device_settings_count} device settings created")
print(f"- {user_activities_count} user activities created")
if __name__ == "__main__":
generate_synthetic_data()