mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
* updated README.md file with bearer token generation * updated README.md file with bearer token generation-removed client id and secret credentials * removed hardcoded domain * added agent runtime, frontend, observability and agentcore identity * update README.md file to reflect frontend testing
421 lines
20 KiB
Python
421 lines
20 KiB
Python
"""
|
|
Generate synthetic data for Device Management Lambda
|
|
"""
|
|
import datetime
|
|
import random
|
|
import uuid
|
|
from decimal import Decimal
|
|
import boto3
|
|
import os
|
|
import json
|
|
|
|
# Configure DynamoDB connection
|
|
# Always use AWS DynamoDB in us-west-2
|
|
aws_region = 'us-west-2'
|
|
|
|
# Initialize DynamoDB resource
|
|
dynamodb = boto3.resource('dynamodb', region_name=aws_region)
|
|
|
|
# Define table names
|
|
DEVICES_TABLE = 'Devices'
|
|
DEVICE_SETTINGS_TABLE = 'DeviceSettings'
|
|
WIFI_NETWORKS_TABLE = 'WifiNetworks'
|
|
USERS_TABLE = 'Users'
|
|
USER_ACTIVITIES_TABLE = 'UserActivities'
|
|
|
|
# Helper function to convert datetime to ISO format string
|
|
def datetime_to_iso(dt):
|
|
if isinstance(dt, datetime.datetime):
|
|
return dt.isoformat()
|
|
return dt
|
|
|
|
# Device CRUD operations
|
|
def create_device(device_data):
|
|
"""Create a new device"""
|
|
table = dynamodb.Table(DEVICES_TABLE)
|
|
|
|
# Ensure device_id exists
|
|
if 'device_id' not in device_data:
|
|
device_data['device_id'] = f"DG-{uuid.uuid4().hex[:8].upper()}"
|
|
|
|
# Convert datetime objects to ISO format strings
|
|
if 'last_connected' in device_data and device_data['last_connected']:
|
|
device_data['last_connected'] = datetime_to_iso(device_data['last_connected'])
|
|
|
|
response = table.put_item(Item=device_data)
|
|
return device_data
|
|
|
|
# Device Settings CRUD operations
|
|
def create_device_setting(device_id, setting_key, setting_value, last_updated=None):
|
|
"""Create or update a device setting"""
|
|
table = dynamodb.Table(DEVICE_SETTINGS_TABLE)
|
|
|
|
if not last_updated:
|
|
last_updated = datetime.datetime.utcnow()
|
|
|
|
item = {
|
|
'device_id': device_id,
|
|
'setting_key': setting_key,
|
|
'setting_value': setting_value,
|
|
'last_updated': datetime_to_iso(last_updated)
|
|
}
|
|
|
|
response = table.put_item(Item=item)
|
|
return item
|
|
|
|
# WiFi Network CRUD operations
|
|
def create_wifi_network(network_data):
|
|
"""Create a new WiFi network"""
|
|
table = dynamodb.Table(WIFI_NETWORKS_TABLE)
|
|
|
|
# Ensure required fields exist
|
|
if 'device_id' not in network_data:
|
|
raise ValueError("device_id is required")
|
|
|
|
if 'network_id' not in network_data:
|
|
network_data['network_id'] = f"wifi_{uuid.uuid4().hex[:8]}"
|
|
|
|
# Convert datetime objects to ISO format strings
|
|
if 'last_updated' in network_data and network_data['last_updated']:
|
|
network_data['last_updated'] = datetime_to_iso(network_data['last_updated'])
|
|
else:
|
|
network_data['last_updated'] = datetime_to_iso(datetime.datetime.utcnow())
|
|
|
|
# Convert float to Decimal for DynamoDB
|
|
if 'signal_strength' in network_data and network_data['signal_strength'] is not None:
|
|
network_data['signal_strength'] = Decimal(str(network_data['signal_strength']))
|
|
|
|
response = table.put_item(Item=network_data)
|
|
return network_data
|
|
|
|
# User CRUD operations
|
|
def create_user(user_data):
|
|
"""Create a new user"""
|
|
table = dynamodb.Table(USERS_TABLE)
|
|
|
|
# Ensure user_id exists
|
|
if 'user_id' not in user_data:
|
|
user_data['user_id'] = f"USR{uuid.uuid4().hex[:8].upper()}"
|
|
|
|
# Convert datetime objects to ISO format strings
|
|
if 'created_at' in user_data and user_data['created_at']:
|
|
user_data['created_at'] = datetime_to_iso(user_data['created_at'])
|
|
else:
|
|
user_data['created_at'] = datetime_to_iso(datetime.datetime.utcnow())
|
|
|
|
if 'last_login' in user_data and user_data['last_login']:
|
|
user_data['last_login'] = datetime_to_iso(user_data['last_login'])
|
|
|
|
response = table.put_item(Item=user_data)
|
|
return user_data
|
|
|
|
# User Activity CRUD operations
|
|
def create_user_activity(user_id, activity_type, description=None, ip_address=None, timestamp=None):
|
|
"""Create a new user activity"""
|
|
table = dynamodb.Table(USER_ACTIVITIES_TABLE)
|
|
|
|
if not timestamp:
|
|
timestamp = datetime.datetime.utcnow()
|
|
|
|
timestamp_str = datetime_to_iso(timestamp)
|
|
|
|
item = {
|
|
'user_id': user_id,
|
|
'timestamp': timestamp_str,
|
|
'activity_type': activity_type,
|
|
'description': description,
|
|
'ip_address': ip_address
|
|
}
|
|
|
|
response = table.put_item(Item=item)
|
|
return item
|
|
|
|
def generate_synthetic_data():
|
|
"""Generate synthetic data for testing with DynamoDB"""
|
|
# Import and initialize database
|
|
from dynamodb_models import init_db
|
|
init_db()
|
|
|
|
# Generate devices (at least 25)
|
|
devices = []
|
|
for i in range(1, 26):
|
|
device_id = f"DG-{100000+i}"
|
|
device_data = {
|
|
'device_id': device_id,
|
|
'name': f"Device Router {i}",
|
|
'model': random.choice(["TransPort WR31", "TransPort WR44", "IX20", "EX15", "IX15", "EX12", "WR54", "WR64"]),
|
|
'firmware_version': f"5.{random.randint(1, 9)}.{random.randint(1, 20)}",
|
|
'connection_status': random.choice(["Connected", "Disconnected", "Dormant", "Maintenance", "Updating"]),
|
|
'ip_address': f"192.168.{random.randint(1, 5)}.{random.randint(2, 254)}",
|
|
'mac_address': f"00:40:9D:{random.randint(10, 99)}:{random.randint(10, 99)}:{random.randint(10, 99)}",
|
|
'last_connected': datetime.datetime.now() - datetime.timedelta(hours=random.randint(0, 72))
|
|
}
|
|
device = create_device(device_data)
|
|
devices.append(device)
|
|
|
|
# Generate device settings (multiple per device)
|
|
setting_keys = [
|
|
"cellular.apn", "cellular.auth_type", "cellular.sim_pin",
|
|
"network.hostname", "network.ip_mode", "network.dns_primary",
|
|
"system.contact", "system.location", "system.description",
|
|
"firewall.enabled", "firewall.default_policy",
|
|
"wifi.channel", "wifi.country", "wifi.regulatory_domain",
|
|
"system.timezone", "system.ntp_server", "system.log_level",
|
|
"network.ipv6_enabled", "network.mtu", "network.gateway",
|
|
"security.ssh_enabled", "security.https_enabled", "security.password_complexity",
|
|
"cellular.band_selection", "cellular.roaming_allowed"
|
|
]
|
|
|
|
device_settings_count = 0
|
|
for device in devices:
|
|
# Ensure each device has at least one setting for each key
|
|
for key in setting_keys:
|
|
if "cellular.apn" in key:
|
|
value = random.choice(["internet", "broadband", "iot.secure", "m2m.provider.com", "wireless.data"])
|
|
elif "auth_type" in key:
|
|
value = random.choice(["none", "pap", "chap", "auto"])
|
|
elif "sim_pin" in key:
|
|
value = str(random.randint(1000, 9999))
|
|
elif "hostname" in key:
|
|
value = f"device-router-{device['device_id'].lower()}"
|
|
elif "ip_mode" in key:
|
|
value = random.choice(["dhcp", "static", "auto"])
|
|
elif "dns" in key:
|
|
value = random.choice(["8.8.8.8", "8.8.4.4", "1.1.1.1", "9.9.9.9", f"192.168.{random.randint(1,5)}.1"])
|
|
elif "contact" in key:
|
|
value = f"admin-{random.randint(1, 10)}@example.com"
|
|
elif "location" in key:
|
|
value = random.choice(["Server Room", "Office", "Remote Site", "Branch Office", "Warehouse",
|
|
"Data Center", "Manufacturing Floor", "Retail Store", "Distribution Center", "Field Site"])
|
|
elif "description" in key:
|
|
value = f"Device Router for {random.choice(['Primary', 'Backup', 'IoT', 'Monitoring', 'Security', 'Guest', 'VPN', 'Emergency'])} connectivity"
|
|
elif "firewall.enabled" in key:
|
|
value = random.choice(["true", "false"])
|
|
elif "default_policy" in key:
|
|
value = random.choice(["accept", "drop", "reject"])
|
|
elif "wifi.channel" in key:
|
|
value = str(random.choice([1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161]))
|
|
elif "wifi.country" in key:
|
|
value = random.choice(["US", "CA", "GB", "DE", "FR", "JP", "AU", "NZ", "SG", "IN"])
|
|
elif "regulatory_domain" in key:
|
|
value = random.choice(["FCC", "ETSI", "TELEC", "KCC", "SRRC"])
|
|
elif "timezone" in key:
|
|
value = random.choice(["UTC", "America/New_York", "Europe/London", "Asia/Tokyo", "Australia/Sydney", "Pacific/Auckland"])
|
|
elif "ntp_server" in key:
|
|
value = random.choice(["pool.ntp.org", "time.google.com", "time.windows.com", "time.apple.com", "ntp.ubuntu.com"])
|
|
elif "log_level" in key:
|
|
value = random.choice(["debug", "info", "warning", "error", "critical"])
|
|
elif "ipv6_enabled" in key:
|
|
value = random.choice(["true", "false"])
|
|
elif "mtu" in key:
|
|
value = str(random.choice([1500, 1492, 1480, 1450, 1400]))
|
|
elif "gateway" in key:
|
|
value = f"192.168.{random.randint(1,5)}.1"
|
|
elif "ssh_enabled" in key:
|
|
value = random.choice(["true", "false"])
|
|
elif "https_enabled" in key:
|
|
value = random.choice(["true", "false"])
|
|
elif "password_complexity" in key:
|
|
value = random.choice(["low", "medium", "high", "extreme"])
|
|
elif "band_selection" in key:
|
|
value = random.choice(["auto", "4g_only", "5g_only", "4g_preferred", "5g_preferred"])
|
|
elif "roaming_allowed" in key:
|
|
value = random.choice(["true", "false"])
|
|
else:
|
|
value = f"value-{random.randint(1, 100)}"
|
|
|
|
last_updated = datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))
|
|
create_device_setting(device['device_id'], key, value, last_updated)
|
|
device_settings_count += 1
|
|
|
|
# Generate WiFi networks (at least 25 total)
|
|
wifi_count = 0
|
|
for device in devices:
|
|
# Ensure we have at least 25 WiFi networks total
|
|
networks_per_device = max(1, 25 // len(devices))
|
|
if wifi_count < 25 and device['device_id'] == devices[-1]['device_id']:
|
|
# Add remaining networks to the last device to ensure we have at least 25
|
|
networks_per_device = max(networks_per_device, 25 - wifi_count)
|
|
|
|
for i in range(1, networks_per_device + 1):
|
|
network_data = {
|
|
'device_id': device['device_id'],
|
|
'network_id': f"wifi_{i}",
|
|
'ssid': f"Device-Net-{device['device_id']}-{i}",
|
|
'security_type': random.choice(["wpa2-psk", "wpa3-psk", "open", "wpa-psk", "wep", "enterprise"]),
|
|
'enabled': random.choice([True, True, False]), # More likely to be enabled
|
|
'channel': random.choice([1, 6, 11, 36, 40, 44, 48, 149, 153, 157, 161]),
|
|
'signal_strength': random.uniform(-90.0, -30.0),
|
|
'last_updated': datetime.datetime.now() - datetime.timedelta(days=random.randint(0, 30))
|
|
}
|
|
create_wifi_network(network_data)
|
|
wifi_count += 1
|
|
|
|
# Generate users (at least 25)
|
|
users = []
|
|
roles = ["admin", "operator", "viewer", "device_admin", "support", "manager", "auditor", "guest", "technician", "developer"]
|
|
domains = ["example.com", "company.net", "device-test.org", "enterprise.co", "tech-corp.io"]
|
|
|
|
first_names = ["John", "Jane", "Robert", "Lisa", "Michael", "Sarah", "David", "Emma", "Chris", "Olivia",
|
|
"William", "Sophia", "James", "Ava", "Benjamin", "Mia", "Daniel", "Charlotte", "Matthew", "Amelia",
|
|
"Joseph", "Harper", "Andrew", "Evelyn", "Joshua", "Abigail", "Nicholas", "Emily", "Ryan", "Elizabeth"]
|
|
|
|
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Miller", "Davis", "Garcia", "Wilson", "Taylor",
|
|
"Anderson", "Thomas", "Jackson", "White", "Harris", "Martin", "Thompson", "Moore", "Young", "Allen",
|
|
"King", "Wright", "Scott", "Green", "Baker", "Adams", "Nelson", "Hill", "Ramirez", "Campbell"]
|
|
|
|
# Create a set to track used email addresses
|
|
used_emails = set()
|
|
|
|
for i in range(1, 26):
|
|
# Keep generating until we get a unique email
|
|
while True:
|
|
first_name = random.choice(first_names)
|
|
last_name = random.choice(last_names)
|
|
domain = random.choice(domains)
|
|
email = f"{first_name.lower()}.{last_name.lower()}{i}@{domain}"
|
|
|
|
if email not in used_emails:
|
|
used_emails.add(email)
|
|
break
|
|
|
|
user_data = {
|
|
'user_id': f"USR{100000+i}",
|
|
'username': f"{first_name.lower()}.{last_name.lower()}{i}",
|
|
'email': email,
|
|
'first_name': first_name,
|
|
'last_name': last_name,
|
|
'role': random.choice(roles),
|
|
'created_at': datetime.datetime.now() - datetime.timedelta(days=random.randint(30, 365)),
|
|
'last_login': datetime.datetime.now() - datetime.timedelta(hours=random.randint(1, 240))
|
|
}
|
|
user = create_user(user_data)
|
|
users.append(user)
|
|
|
|
# Generate user activities (at least 25 per user)
|
|
activity_types = [
|
|
"login", "logout", "device_config_change", "firmware_update",
|
|
"user_added", "user_removed", "report_generated", "alert_acknowledged",
|
|
"device_reboot", "password_change", "vpn_connected", "vpn_disconnected",
|
|
"file_upload", "file_download", "settings_export", "settings_import",
|
|
"group_created", "group_modified", "permission_changed", "api_key_created",
|
|
"api_key_revoked", "schedule_created", "schedule_modified", "backup_created",
|
|
"restore_performed", "certificate_installed", "certificate_expired", "security_scan",
|
|
"remote_access_enabled", "remote_access_disabled"
|
|
]
|
|
|
|
user_activities_count = 0
|
|
for user in users:
|
|
# Generate at least 25 activities per user
|
|
for _ in range(25):
|
|
activity_type = random.choice(activity_types)
|
|
|
|
if activity_type == "login":
|
|
description = f"User logged in from {random.choice(['web interface', 'mobile app', 'API', 'CLI', 'desktop application'])}"
|
|
elif activity_type == "logout":
|
|
description = "User logged out"
|
|
elif activity_type == "device_config_change":
|
|
device = random.choice(devices)
|
|
setting = random.choice(setting_keys)
|
|
description = f"Changed {setting} on device {device['name']}"
|
|
elif activity_type == "firmware_update":
|
|
device = random.choice(devices)
|
|
old_version = f"5.{random.randint(1, 5)}.{random.randint(1, 10)}"
|
|
new_version = f"5.{random.randint(6, 9)}.{random.randint(1, 20)}"
|
|
description = f"Updated firmware on {device['name']} from {old_version} to {new_version}"
|
|
elif activity_type == "user_added":
|
|
description = f"Added new user {random.choice(first_names).lower()}.{random.choice(last_names).lower()}"
|
|
elif activity_type == "user_removed":
|
|
description = f"Removed user {random.choice(first_names).lower()}.{random.choice(last_names).lower()}"
|
|
elif activity_type == "report_generated":
|
|
report_type = random.choice(["usage", "security", "connectivity", "performance", "compliance", "audit", "inventory"])
|
|
description = f"Generated {report_type} report"
|
|
elif activity_type == "alert_acknowledged":
|
|
alert_id = f"ALT-{random.randint(10000, 99999)}"
|
|
description = f"Acknowledged alert {alert_id}"
|
|
elif activity_type == "device_reboot":
|
|
device = random.choice(devices)
|
|
description = f"Initiated reboot of device {device['name']}"
|
|
elif activity_type == "password_change":
|
|
description = "Changed account password"
|
|
elif activity_type == "vpn_connected":
|
|
description = f"Connected to VPN from IP {random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}.{random.randint(1, 255)}"
|
|
elif activity_type == "vpn_disconnected":
|
|
description = "Disconnected from VPN"
|
|
elif activity_type == "file_upload":
|
|
file_type = random.choice(["configuration", "firmware", "certificate", "log", "backup"])
|
|
description = f"Uploaded {file_type} file"
|
|
elif activity_type == "file_download":
|
|
file_type = random.choice(["configuration", "report", "log", "backup", "template"])
|
|
description = f"Downloaded {file_type} file"
|
|
elif activity_type == "settings_export":
|
|
device = random.choice(devices)
|
|
description = f"Exported settings from device {device['name']}"
|
|
elif activity_type == "settings_import":
|
|
device = random.choice(devices)
|
|
description = f"Imported settings to device {device['name']}"
|
|
elif activity_type == "group_created":
|
|
description = f"Created device group '{random.choice(['Production', 'Testing', 'Development', 'Backup', 'Remote', 'Office', 'Warehouse'])}'"
|
|
elif activity_type == "group_modified":
|
|
description = f"Modified device group '{random.choice(['Production', 'Testing', 'Development', 'Backup', 'Remote', 'Office', 'Warehouse'])}'"
|
|
elif activity_type == "permission_changed":
|
|
target_user = random.choice(users)
|
|
description = f"Changed permissions for user {target_user['username']}"
|
|
elif activity_type == "api_key_created":
|
|
description = "Created new API key"
|
|
elif activity_type == "api_key_revoked":
|
|
description = "Revoked API key"
|
|
elif activity_type == "schedule_created":
|
|
task = random.choice(["backup", "reboot", "firmware update", "report generation", "maintenance"])
|
|
description = f"Created schedule for {task}"
|
|
elif activity_type == "schedule_modified":
|
|
task = random.choice(["backup", "reboot", "firmware update", "report generation", "maintenance"])
|
|
description = f"Modified schedule for {task}"
|
|
elif activity_type == "backup_created":
|
|
device = random.choice(devices)
|
|
description = f"Created backup of device {device['name']}"
|
|
elif activity_type == "restore_performed":
|
|
device = random.choice(devices)
|
|
description = f"Restored configuration to device {device['name']}"
|
|
elif activity_type == "certificate_installed":
|
|
device = random.choice(devices)
|
|
cert_type = random.choice(["SSL", "SSH", "CA", "client"])
|
|
description = f"Installed {cert_type} certificate on device {device['name']}"
|
|
elif activity_type == "certificate_expired":
|
|
device = random.choice(devices)
|
|
cert_type = random.choice(["SSL", "SSH", "CA", "client"])
|
|
description = f"{cert_type} certificate expired on device {device['name']}"
|
|
elif activity_type == "security_scan":
|
|
device = random.choice(devices)
|
|
description = f"Performed security scan on device {device['name']}"
|
|
elif activity_type == "remote_access_enabled":
|
|
device = random.choice(devices)
|
|
description = f"Enabled remote access for device {device['name']}"
|
|
elif activity_type == "remote_access_disabled":
|
|
device = random.choice(devices)
|
|
description = f"Disabled remote access for device {device['name']}"
|
|
else:
|
|
description = f"Performed {activity_type}"
|
|
|
|
# Generate a timestamp within the last 30 days
|
|
timestamp = datetime.datetime.now() - datetime.timedelta(
|
|
days=random.randint(0, 30),
|
|
hours=random.randint(0, 23),
|
|
minutes=random.randint(0, 59)
|
|
)
|
|
|
|
ip_address = f"{random.randint(10, 203)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(1, 254)}"
|
|
|
|
create_user_activity(user['user_id'], activity_type, description, ip_address, timestamp)
|
|
user_activities_count += 1
|
|
|
|
print(f"Synthetic data generated successfully!")
|
|
print(f"- {len(devices)} devices created")
|
|
print(f"- {len(users)} users created")
|
|
print(f"- {wifi_count} WiFi networks created")
|
|
print(f"- {device_settings_count} device settings created")
|
|
print(f"- {user_activities_count} user activities created")
|
|
|
|
if __name__ == "__main__":
|
|
generate_synthetic_data()
|