1252 lines
51 KiB
Python

import json
import boto3
import psycopg2
import os
from botocore.exceptions import ClientError
def get_secret(secret_name):
"""Get secret from AWS Secrets Manager """
#secret_name = os.environ['SECRET_NAME']
region_name = os.environ['REGION']
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
secret_value = client.get_secret_value(SecretId=secret_name)
secret = json.loads(secret_value['SecretString'])
return secret
except ClientError as e:
raise Exception(f"Failed to get secret: {str(e)}")
def execute_slow_query(secret_name, min_exec_time):
"""Execute multiple performance-related queries"""
queries = {
"slow_queries": """
SELECT
CASE
WHEN rolname IS NULL THEN 'unknown'
ELSE rolname
END as username,
CASE
WHEN datname IS NULL THEN 'unknown'
ELSE datname
END as database,
query,
calls,
total_exec_time/1000 as total_time_sec,
min_exec_time/1000 as min_time_sec,
max_exec_time/1000 as max_time_sec,
mean_exec_time/1000 as avg_time_sec,
stddev_exec_time/1000 as stddev_time_sec,
rows
FROM pg_stat_statements s
LEFT JOIN pg_roles r ON r.oid = s.userid
LEFT JOIN pg_database d ON d.oid = s.dbid
ORDER BY total_exec_time DESC
LIMIT 10;
""",
"high_io_queries": """
SELECT
CASE
WHEN rolname IS NULL THEN 'unknown'
ELSE rolname
END as username,
CASE
WHEN datname IS NULL THEN 'unknown'
ELSE datname
END as database,
query,
shared_blks_hit,
shared_blks_read,
shared_blks_dirtied,
shared_blks_written,
local_blks_hit,
local_blks_read,
temp_blks_read,
temp_blks_written
FROM pg_stat_statements s
LEFT JOIN pg_roles r ON r.oid = s.userid
LEFT JOIN pg_database d ON d.oid = s.dbid
ORDER BY shared_blks_read + shared_blks_written DESC
LIMIT 5;
""",
"high_temp_queries": """
SELECT
CASE
WHEN rolname IS NULL THEN 'unknown'
ELSE rolname
END as username,
CASE
WHEN datname IS NULL THEN 'unknown'
ELSE datname
END as database,
query,
temp_blks_read,
temp_blks_written
FROM pg_stat_statements s
LEFT JOIN pg_roles r ON r.oid = s.userid
LEFT JOIN pg_database d ON d.oid = s.dbid
ORDER BY temp_blks_written + temp_blks_read DESC
LIMIT 5;
""",
"blocking_queries": """
SELECT
blocked.pid AS blocked_pid,
blocked.usename AS blocked_user,
blocking.pid AS blocking_pid,
blocking.usename AS blocking_user,
blocked.query AS blocked_query,
blocking.query AS blocking_query
FROM pg_stat_activity blocked
JOIN pg_stat_activity blocking ON blocking.pid = ANY(pg_blocking_pids(blocked.pid))
WHERE NOT blocked.pid = blocking.pid
LIMIT 3;;
"""
}
print("Connecting to the database...")
conn = connect_to_db(secret_name)
print("Connected to the database.")
try:
# First, ensure pg_stat_statements is installed
print(" I am here 1")
with conn.cursor() as cur:
print(" I am here 2")
cur.execute("""
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
""")
conn.commit()
print(" I am here 3")
# Execute the main query
results = {}
# Execute each query and collect results
for query_name, query in queries.items():
print(" I am here 4")
with conn.cursor() as cur:
print(" I am here 5")
cur.execute(query)
print(" I am here 6")
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
results[query_name] = [dict(zip(columns, row)) for row in rows]
print(" I am here 7")
return results
except Exception as e:
raise Exception(f"Failed to retrieve slow queries: {str(e)}")
finally:
if conn:
conn.close()
def format_results_for_slow_query(results):
"""Format results in a human-readable string"""
output = "Database Performance Analysis Report\n\n"
# Format slow queries
output += "=== TOP 20 SLOW QUERIES ===\n"
if results.get("slow_queries"):
for idx, query in enumerate(results["slow_queries"], 1):
output += f"\nQuery #{idx}:\n"
output += f"• Username: {query['username']}\n"
output += f"• Database: {query['database']}\n"
output += f"• Calls: {query['calls']}\n"
output += f"• Total Time: {round(query['total_time_sec'], 2)} sec\n"
output += f"• Avg Time: {round(query['avg_time_sec'], 2)} sec\n"
output += f"• Min Time: {round(query['min_time_sec'], 2)} sec\n"
output += f"• Max Time: {round(query['max_time_sec'], 2)} sec\n"
output += f"• Rows: {query['rows']}\n"
output += f"• Query: {query['query']}\n"
else:
output += "No slow queries found.\n"
# Format high IO queries
output += "\n=== TOP 10 HIGH I/O QUERIES ===\n"
if results.get("high_io_queries"):
for idx, query in enumerate(results["high_io_queries"], 1):
output += f"\nQuery #{idx}:\n"
output += f"• Username: {query['username']}\n"
output += f"• Database: {query['database']}\n"
output += f"• Shared Blocks Hit: {query['shared_blks_hit']}\n"
output += f"• Shared Blocks Read: {query['shared_blks_read']}\n"
output += f"• Shared Blocks Written: {query['shared_blks_written']}\n"
output += f"• Temp Blocks Read: {query['temp_blks_read']}\n"
output += f"• Temp Blocks Written: {query['temp_blks_written']}\n"
output += f"• Query: {query['query']}\n"
else:
output += "No high I/O queries found.\n"
# Format high temp usage queries
output += "\n=== TOP 10 HIGH TEMP USAGE QUERIES ===\n"
if results.get("high_temp_queries"):
for idx, query in enumerate(results["high_temp_queries"], 1):
output += f"\nQuery #{idx}:\n"
output += f"• Username: {query['username']}\n"
output += f"• Database: {query['database']}\n"
output += f"• Temp Blocks Read: {query['temp_blks_read']}\n"
output += f"• Temp Blocks Written: {query['temp_blks_written']}\n"
output += f"• Query: {query['query']}\n"
else:
output += "No high temp usage queries found.\n"
# Format blocking queries
output += "\n=== BLOCKING QUERIES ===\n"
if results.get("blocking_queries"):
for idx, query in enumerate(results["blocking_queries"], 1):
output += f"\nBlocking Situation #{idx}:\n"
output += f"• Blocked PID: {query['blocked_pid']}\n"
output += f"• Blocked User: {query['blocked_user']}\n"
output += f"• Blocked Query: {query['blocked_query']}\n"
output += f"• Blocking PID: {query['blocking_pid']}\n"
output += f"• Blocking User: {query['blocking_user']}\n"
output += f"• Blocking Query: {query['blocking_query']}\n"
else:
output += "No blocking queries found.\n"
return output
def execute_connect_issues(secret_name, min_exec_time):
"""Execute connection management related queries"""
queries = {
"current_connections": """
SELECT
datname as database,
usename as username,
application_name,
client_addr,
backend_start,
state,
wait_event_type,
wait_event,
query
FROM pg_stat_activity
WHERE state IS NOT NULL
ORDER BY backend_start DESC;
""",
"connection_stats": """
SELECT
datname as database,
numbackends as current_connections,
xact_commit as commits,
xact_rollback as rollbacks,
blks_read,
blks_hit,
tup_returned,
tup_fetched,
tup_inserted,
tup_updated,
tup_deleted
FROM pg_stat_database
WHERE datname IS NOT NULL;
""",
"idle_connections": """
SELECT
datname as database,
usename as username,
application_name,
client_addr,
backend_start,
state,
state_change,
query
FROM pg_stat_activity
WHERE state = 'idle'
ORDER BY backend_start DESC;
""",
"locked_queries": """
SELECT DISTINCT ON (pid)
pid,
usename as username,
datname as database,
mode,
CASE locktype
WHEN 'relation' THEN rel.relname
WHEN 'virtualxid' THEN 'virtual transaction'
WHEN 'transactionid' THEN 'transaction'
WHEN 'tuple' THEN 'tuple'
ELSE locktype
END as lock_type,
application_name,
state,
query,
age(now(), query_start) as query_duration
FROM pg_stat_activity sa
JOIN pg_locks locks ON sa.pid = locks.pid
LEFT JOIN pg_class rel ON rel.oid = locks.relation
WHERE NOT granted
ORDER BY pid, query_start;
"""
}
conn = connect_to_db(secret_name)
try:
# First, ensure pg_stat_statements is installed
with conn.cursor() as cur:
cur.execute("""
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
""")
conn.commit()
# Execute the main query
results = {}
# Execute each query and collect results
for query_name, query in queries.items():
try:
with conn.cursor() as cur:
cur.execute(query)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
results[query_name] = [dict(zip(columns, row)) for row in rows]
except Exception as e:
print(f"Error executing {query_name}: {str(e)}")
results[query_name] = []
return results
except Exception as e:
raise Exception(f"Failed to retrieve connection metrics: {str(e)}")
finally:
if conn:
conn.close()
def format_results_for_conn_issues(results):
"""Format connection management results in a human-readable string"""
output = "Database Connection Management Analysis Report\n\n"
# Format current connections
output += "=== CURRENT CONNECTIONS ===\n"
if results.get("current_connections"):
for idx, conn in enumerate(results["current_connections"], 1):
output += f"\nConnection #{idx}:\n"
output += f"• Database: {conn['database']}\n"
output += f"• Username: {conn['username']}\n"
output += f"• Application: {conn['application_name']}\n"
output += f"• Client Address: {conn['client_addr']}\n"
output += f"• State: {conn['state']}\n"
output += f"• Wait Event Type: {conn['wait_event_type']}\n"
output += f"• Wait Event: {conn['wait_event']}\n"
output += f"• Current Query: {conn['query']}\n"
else:
output += "No current connections found.\n"
# Format connection stats
output += "\n=== DATABASE CONNECTION STATISTICS ===\n"
if results.get("connection_stats"):
for idx, stat in enumerate(results["connection_stats"], 1):
output += f"\nDatabase: {stat['database']}\n"
output += f"• Current Connections: {stat['current_connections']}\n"
output += f"• Commits: {stat['commits']}\n"
output += f"• Rollbacks: {stat['rollbacks']}\n"
output += f"• Blocks Read: {stat['blks_read']}\n"
output += f"• Blocks Hit: {stat['blks_hit']}\n"
output += f"• Tuples Returned: {stat['tup_returned']}\n"
output += f"• Tuples Fetched: {stat['tup_fetched']}\n"
output += f"• Tuples Inserted: {stat['tup_inserted']}\n"
output += f"• Tuples Updated: {stat['tup_updated']}\n"
output += f"• Tuples Deleted: {stat['tup_deleted']}\n"
else:
output += "No connection statistics available.\n"
# Format idle connections
output += "\n=== IDLE CONNECTIONS ===\n"
if results.get("idle_connections"):
for idx, idle in enumerate(results["idle_connections"], 1):
output += f"\nIdle Connection #{idx}:\n"
output += f"• Database: {idle['database']}\n"
output += f"• Username: {idle['username']}\n"
output += f"• Application: {idle['application_name']}\n"
output += f"• Client Address: {idle['client_addr']}\n"
output += f"• Backend Start: {idle['backend_start']}\n"
output += f"• State Change: {idle['state_change']}\n"
output += f"• Last Query: {idle['query']}\n"
else:
output += "No idle connections found.\n"
# Format locked queries
output += "\n=== LOCKED QUERIES ===\n"
if results.get("locked_queries"):
for idx, lock in enumerate(results["locked_queries"], 1):
output += f"\nLocked Query #{idx}:\n"
output += f"• PID: {lock['pid']}\n"
output += f"• Username: {lock['username']}\n"
output += f"• Database: {lock['database']}\n"
output += f"• Lock Type: {lock['lock_type']}\n"
output += f"• Lock Mode: {lock['mode']}\n"
output += f"• Application: {lock['application_name']}\n"
output += f"• State: {lock['state']}\n"
output += f"• Query Duration: {lock['query_duration']}\n"
output += f"• Query: {lock['query']}\n"
else:
output += "No locked queries found.\n"
return output
def execute_index_analysis(secret_name):
"""Execute index-related analysis queries"""
queries = {
"unused_indexes": """
SELECT s.schemaname,
s.relname as table_name,
s.indexrelname as index_name,
s.idx_scan,
pg_size_pretty(pg_relation_size(s.indexrelid::regclass)) as index_size,
pg_relation_size(s.indexrelid) as index_size_bytes
FROM pg_stat_user_indexes s
JOIN pg_index i ON s.indexrelid = i.indexrelid
WHERE s.idx_scan = 0 AND NOT i.indisprimary
ORDER BY pg_relation_size(s.indexrelid) DESC;
""",
"missing_indexes": """
SELECT schemaname,
relname as table_name,
seq_scan,
seq_tup_read,
idx_scan,
idx_tup_fetch,
pg_size_pretty(pg_relation_size(relid)) as table_size,
ROUND(seq_scan::float/(seq_scan+idx_scan+1)::float, 2) as seq_scan_ratio
FROM pg_stat_user_tables
WHERE seq_scan > 0
ORDER BY seq_tup_read DESC;
""",
"index_efficiency": """
SELECT s.relname as table_name,
i.indexrelname as index_name,
i.idx_scan as times_used,
pg_size_pretty(pg_relation_size(i.indexrelid::regclass)) as index_size,
ROUND(i.idx_scan::float / NULLIF(pg_relation_size(i.indexrelid), 0)::float, 6) as scans_per_byte
FROM pg_stat_user_tables s
JOIN pg_stat_user_indexes i ON s.relid = i.relid
WHERE i.idx_scan > 0
ORDER BY i.idx_scan::float / NULLIF(pg_relation_size(i.indexrelid), 0)::float ASC
LIMIT 20;
"""
}
conn = connect_to_db(secret_name)
try:
# First, ensure pg_stat_statements is installed
with conn.cursor() as cur:
cur.execute("""
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
""")
conn.commit()
# Execute the main query
results = {}
# Execute each query and collect results
for query_name, query in queries.items():
try:
with conn.cursor() as cur:
cur.execute(query)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
results[query_name] = [dict(zip(columns, row)) for row in rows]
except Exception as e:
print(f"Error executing {query_name}: {str(e)}")
results[query_name] = []
return results
except Exception as e:
raise Exception(f"Failed to retrieve index metrics: {str(e)}")
finally:
if conn:
conn.close()
def format_results_for_index_analysis(results):
"""Format index analysis results in a human-readable string"""
output = "Database Index Analysis Report\n\n"
# Format unused indexes
output += "=== UNUSED INDEXES ===\n"
if results.get("unused_indexes"):
for idx, index in enumerate(results["unused_indexes"], 1):
output += f"\nUnused Index #{idx}:\n"
output += f"• Schema: {index['schemaname']}\n"
output += f"• Table: {index['table_name']}\n"
output += f"• Index: {index['index_name']}\n"
output += f"• Scan Count: {index['idx_scan']}\n"
output += f"• Index Size: {index['index_size']}\n"
output += "\nRecommendation: Consider removing these unused indexes to reduce maintenance overhead and storage space.\n"
else:
output += "No unused indexes found.\n"
# Format missing indexes
output += "\n=== POTENTIAL MISSING INDEXES (High Sequential Scans) ===\n"
if results.get("missing_indexes"):
for idx, table in enumerate(results["missing_indexes"], 1):
output += f"\nTable #{idx}:\n"
output += f"• Schema: {table['schemaname']}\n"
output += f"• Table: {table['table_name']}\n"
output += f"• Sequential Scans: {table['seq_scan']}\n"
output += f"• Sequential Tuples Read: {table['seq_tup_read']}\n"
output += f"• Index Scans: {table['idx_scan']}\n"
output += f"• Index Tuples Fetched: {table['idx_tup_fetch']}\n"
output += f"• Table Size: {table['table_size']}\n"
output += f"• Sequential Scan Ratio: {table['seq_scan_ratio']}\n"
output += "\nRecommendation: Tables with high sequential scan ratios might benefit from additional indexes.\n"
else:
output += "No tables with significant sequential scans found.\n"
# Format index efficiency
output += "\n=== INDEX USAGE EFFICIENCY ===\n"
if results.get("index_efficiency"):
for idx, index in enumerate(results["index_efficiency"], 1):
output += f"\nIndex #{idx}:\n"
output += f"• Table: {index['table_name']}\n"
output += f"• Index: {index['index_name']}\n"
output += f"• Times Used: {index['times_used']}\n"
output += f"• Index Size: {index['index_size']}\n"
output += f"• Scans per Byte: {index['scans_per_byte']}\n"
output += "\nRecommendation: Indexes with very low scans per byte might be candidates for removal or restructuring.\n"
else:
output += "No index usage statistics found.\n"
return output
def execute_autovacuum_analysis(secret_name):
"""Execute autovacuum-related analysis queries"""
queries = {
"tables_needing_vacuum": """
SELECT relname as table_name,
n_dead_tup as dead_tuples,
n_live_tup as live_tuples,
(n_dead_tup::float / NULLIF(n_live_tup + n_dead_tup, 0) * 100)::numeric(10,2) as dead_percentage,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze
FROM pg_stat_user_tables
WHERE n_dead_tup > 0
ORDER BY dead_percentage DESC;
""",
"autovacuum_activity": """
SELECT pid,
datname,
usename,
query,
state,
wait_event_type,
wait_event,
age(now(), xact_start) as xact_age,
age(now(), query_start) as query_age
FROM pg_stat_activity
WHERE query LIKE '%autovacuum%'
AND state != 'idle';
""",
"table_bloat": """
SELECT schemaname,
relname,
n_live_tup,
n_dead_tup,
pg_size_pretty(pg_total_relation_size(schemaname || '.' || relname::text)) as total_size
FROM pg_stat_user_tables
ORDER BY n_dead_tup DESC
LIMIT 20;
""",
"wraparound_status": """
SELECT datname,
age(datfrozenxid) as xid_age,
current_setting('autovacuum_freeze_max_age')::int as max_age,
round(100 * age(datfrozenxid)::float /
current_setting('autovacuum_freeze_max_age')::int) as percent_towards_wraparound
FROM pg_database
ORDER BY age(datfrozenxid) DESC;
"""
}
conn = connect_to_db(secret_name)
try:
# First, ensure pg_stat_statements is installed
with conn.cursor() as cur:
cur.execute("""
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
""")
conn.commit()
# Execute the main query
results = {}
# Execute each query and collect results
for query_name, query in queries.items():
try:
with conn.cursor() as cur:
cur.execute(query)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
results[query_name] = [dict(zip(columns, row)) for row in rows]
except Exception as e:
print(f"Error executing {query_name}: {str(e)}")
results[query_name] = []
return results
except Exception as e:
raise Exception(f"Failed to retrieve autovacuum metrics: {str(e)}")
finally:
if conn:
conn.close()
def format_results_for_autovacuum_analysis(results):
"""Format autovacuum analysis results in a human-readable string"""
output = "Database Autovacuum Analysis Report\n\n"
# Format tables needing vacuum
output += "=== TABLES NEEDING VACUUM ===\n"
if results.get("tables_needing_vacuum"):
for idx, table in enumerate(results["tables_needing_vacuum"], 1):
output += f"\nTable #{idx}:\n"
output += f"• Table Name: {table['table_name']}\n"
output += f"• Dead Tuples: {table['dead_tuples']}\n"
output += f"• Live Tuples: {table['live_tuples']}\n"
output += f"• Dead Percentage: {table['dead_percentage']}%\n"
output += f"• Last Vacuum: {table['last_vacuum'] or 'Never'}\n"
output += f"• Last Autovacuum: {table['last_autovacuum'] or 'Never'}\n"
output += f"• Last Analyze: {table['last_analyze'] or 'Never'}\n"
output += f"• Last Autoanalyze: {table['last_autoanalyze'] or 'Never'}\n"
output += "\nRecommendation: Consider running VACUUM on tables with high dead tuple percentages.\n"
else:
output += "No tables with dead tuples found.\n"
# Format current autovacuum activity
output += "\n=== CURRENT AUTOVACUUM ACTIVITY ===\n"
if results.get("autovacuum_activity"):
for idx, activity in enumerate(results["autovacuum_activity"], 1):
output += f"\nAutovacuum Process #{idx}:\n"
output += f"• PID: {activity['pid']}\n"
output += f"• Database: {activity['datname']}\n"
output += f"• User: {activity['usename']}\n"
output += f"• State: {activity['state']}\n"
output += f"• Wait Event Type: {activity['wait_event_type']}\n"
output += f"• Wait Event: {activity['wait_event']}\n"
output += f"• Transaction Age: {activity['xact_age']}\n"
output += f"• Query Age: {activity['query_age']}\n"
output += f"• Query: {activity['query']}\n"
else:
output += "No active autovacuum processes found.\n"
# Format table bloat information
output += "\n=== TABLE BLOAT INFORMATION ===\n"
if results.get("table_bloat"):
for idx, bloat in enumerate(results["table_bloat"], 1):
output += f"\nTable #{idx}:\n"
output += f"• Schema: {bloat['schemaname']}\n"
output += f"• Table: {bloat['relname']}\n"
output += f"• Live Tuples: {bloat['n_live_tup']}\n"
output += f"• Dead Tuples: {bloat['n_dead_tup']}\n"
output += f"• Total Size: {bloat['total_size']}\n"
else:
output += "No table bloat information available.\n"
# Format transaction wraparound status
output += "\n=== TRANSACTION WRAPAROUND STATUS ===\n"
if results.get("wraparound_status"):
for idx, status in enumerate(results["wraparound_status"], 1):
output += f"\nDatabase: {status['datname']}\n"
output += f"• XID Age: {status['xid_age']}\n"
output += f"• Max Age: {status['max_age']}\n"
output += f"• Percent Towards Wraparound: {status['percent_towards_wraparound']}%\n"
# Add warning if approaching wraparound
if status['percent_towards_wraparound'] > 75:
output += "⚠️ WARNING: Database is approaching transaction wraparound limit!\n"
else:
output += "No wraparound status information available.\n"
return output
def execute_io_analysis(secret_name):
"""Execute I/O-related analysis queries"""
queries = {
"buffer_usage": """
SELECT relname as table_name,
heap_blks_read,
heap_blks_hit,
CASE WHEN heap_blks_read + heap_blks_hit > 0
THEN (heap_blks_hit::float / (heap_blks_read + heap_blks_hit) * 100)::numeric(10,2)
ELSE 0
END as hit_percentage
FROM pg_statio_user_tables
ORDER BY heap_blks_read DESC;
""",
"checkpoint_activity": """
SELECT checkpoints_timed,
checkpoints_req,
checkpoint_write_time,
checkpoint_sync_time,
buffers_checkpoint,
buffers_clean,
buffers_backend,
buffers_backend_fsync,
buffers_alloc,
stats_reset
FROM pg_stat_bgwriter;
""",
"io_statistics": """
SELECT s.relname as table_name,
pg_size_pretty(pg_relation_size(s.relid)) as table_size,
io.heap_blks_read,
io.heap_blks_hit,
io.idx_blks_read,
io.idx_blks_hit,
io.toast_blks_read,
io.toast_blks_hit,
io.tidx_blks_read,
io.tidx_blks_hit
FROM pg_statio_user_tables io
JOIN pg_stat_user_tables s ON io.relid = s.relid
ORDER BY (io.heap_blks_read + io.idx_blks_read +
io.toast_blks_read + io.tidx_blks_read) DESC
LIMIT 20;
"""
}
conn = connect_to_db(secret_name)
try:
# First, ensure pg_stat_statements is installed
with conn.cursor() as cur:
cur.execute("""
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
""")
conn.commit()
results = {}
# Execute each query and collect results
for query_name, query in queries.items():
try:
with conn.cursor() as cur:
cur.execute(query)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
results[query_name] = [dict(zip(columns, row)) for row in rows]
except Exception as e:
print(f"Error executing {query_name}: {str(e)}")
results[query_name] = []
return results
except Exception as e:
raise Exception(f"Failed to retrieve I/O metrics: {str(e)}")
finally:
if conn:
conn.close()
def format_results_for_io_analysis(results):
"""Format I/O analysis results in a human-readable string"""
output = "Database I/O Analysis Report\n\n"
# Format buffer usage
output += "=== BUFFER USAGE BY TABLE ===\n"
if results.get("buffer_usage"):
for idx, table in enumerate(results["buffer_usage"], 1):
output += f"\nTable #{idx}:\n"
output += f"• Table Name: {table['table_name']}\n"
output += f"• Blocks Read from Disk: {table['heap_blks_read']}\n"
output += f"• Blocks Hit in Buffer: {table['heap_blks_hit']}\n"
output += f"• Buffer Hit Percentage: {table['hit_percentage']}%\n"
# Add recommendations based on hit percentage
if table['hit_percentage'] < 90:
output += "⚠️ Warning: Low buffer hit ratio. Consider increasing shared_buffers.\n"
else:
output += "No buffer usage statistics available.\n"
# Format checkpoint activity
output += "\n=== CHECKPOINT ACTIVITY ===\n"
if results.get("checkpoint_activity") and results["checkpoint_activity"]:
checkpoint = results["checkpoint_activity"][0] # Should only be one row
output += f"• Scheduled Checkpoints: {checkpoint['checkpoints_timed']}\n"
output += f"• Requested Checkpoints: {checkpoint['checkpoints_req']}\n"
output += f"• Checkpoint Write Time: {checkpoint['checkpoint_write_time']} ms\n"
output += f"• Checkpoint Sync Time: {checkpoint['checkpoint_sync_time']} ms\n"
output += f"• Buffers Written During Checkpoints: {checkpoint['buffers_checkpoint']}\n"
output += f"• Buffers Written by Background Writer: {checkpoint['buffers_clean']}\n"
output += f"• Buffers Written by Backend Processes: {checkpoint['buffers_backend']}\n"
output += f"• Backend fsync Calls: {checkpoint['buffers_backend_fsync']}\n"
output += f"• Buffers Allocated: {checkpoint['buffers_alloc']}\n"
output += f"• Statistics Reset Time: {checkpoint['stats_reset']}\n"
# Add recommendations based on checkpoint activity
if checkpoint['checkpoints_req'] > checkpoint['checkpoints_timed']:
output += "\n⚠️ Warning: High number of requested checkpoints. Consider increasing checkpoint_timeout or max_wal_size.\n"
else:
output += "No checkpoint activity information available.\n"
# Format I/O statistics
output += "\n=== DETAILED I/O STATISTICS (TOP 20 TABLES) ===\n"
if results.get("io_statistics"):
for idx, stat in enumerate(results["io_statistics"], 1):
output += f"\nTable #{idx}:\n"
output += f"• Table Name: {stat['table_name']}\n"
output += f"• Table Size: {stat['table_size']}\n"
output += f"• Heap Blocks Read: {stat['heap_blks_read']}\n"
output += f"• Heap Blocks Hit: {stat['heap_blks_hit']}\n"
output += f"• Index Blocks Read: {stat['idx_blks_read']}\n"
output += f"• Index Blocks Hit: {stat['idx_blks_hit']}\n"
output += f"• Toast Blocks Read: {stat['toast_blks_read']}\n"
output += f"• Toast Blocks Hit: {stat['toast_blks_hit']}\n"
output += f"• Toast Index Blocks Read: {stat['tidx_blks_read']}\n"
output += f"• Toast Index Blocks Hit: {stat['tidx_blks_hit']}\n"
# Calculate and show hit ratios
total_reads = stat['heap_blks_read'] + stat['idx_blks_read']
total_hits = stat['heap_blks_hit'] + stat['idx_blks_hit']
if total_reads + total_hits > 0:
hit_ratio = (total_hits / (total_reads + total_hits)) * 100
output += f"• Overall Buffer Hit Ratio: {hit_ratio:.2f}%\n"
if hit_ratio < 90:
output += "⚠️ Warning: Low buffer hit ratio for this table.\n"
else:
output += "No I/O statistics available.\n"
return output
def execute_replication_analysis(secret_name):
"""Execute replication-related analysis queries"""
queries = {
"aurora_replica_status": """
SELECT server_id,
EXTRACT(EPOCH FROM (now() - last_update_timestamp)) AS lag_seconds,
durable_lsn,
highest_lsn_rcvd,
current_read_lsn,
last_update_timestamp
FROM aurora_replica_status;
""",
"replication_slots": """
SELECT slot_name,
slot_type,
active,
confirmed_flush_lsn,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as lag_size
FROM pg_replication_slots;
""",
"replication_connections": """
SELECT pid,
usesysid,
usename,
application_name,
client_addr,
client_hostname,
client_port,
backend_start,
state,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
pg_wal_lsn_diff(sent_lsn, replay_lsn) as lag_bytes
FROM pg_stat_replication;
"""
}
conn = connect_to_db(secret_name)
try:
# First, ensure pg_stat_statements is installed
with conn.cursor() as cur:
cur.execute("""
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
""")
conn.commit()
results = {}
# Execute each query and collect results
for query_name, query in queries.items():
try:
with conn.cursor() as cur:
cur.execute(query)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
results[query_name] = [dict(zip(columns, row)) for row in rows]
except Exception as e:
print(f"Error executing {query_name}: {str(e)}")
results[query_name] = []
return results
except Exception as e:
raise Exception(f"Failed to retrieve replication metrics: {str(e)}")
finally:
if conn:
conn.close()
def format_results_for_replication_analysis(results):
"""Format replication analysis results in a human-readable string"""
output = "Database Replication Analysis Report\n\n"
# Format Aurora replica status
output += "=== AURORA REPLICA STATUS ===\n"
if results.get("aurora_replica_status"):
for idx, replica in enumerate(results["aurora_replica_status"], 1):
output += f"\nReplica #{idx}:\n"
output += f"• Server ID: {replica['server_id']}\n"
output += f"• Replication Lag: {round(float(replica['lag_seconds']), 2)} seconds\n"
output += f"• Durable LSN: {replica['durable_lsn']}\n"
output += f"• Highest Received LSN: {replica['highest_lsn_rcvd']}\n"
output += f"• Current Read LSN: {replica['current_read_lsn']}\n"
output += f"• Last Update: {replica['last_update_timestamp']}\n"
# Add warnings for high lag
if float(replica['lag_seconds']) > 30:
output += "⚠️ Warning: High replication lag detected!\n"
else:
output += "No Aurora replica status information available.\n"
# Format replication slots
output += "\n=== REPLICATION SLOTS ===\n"
if results.get("replication_slots"):
for idx, slot in enumerate(results["replication_slots"], 1):
output += f"\nSlot #{idx}:\n"
output += f"• Slot Name: {slot['slot_name']}\n"
output += f"• Slot Type: {slot['slot_type']}\n"
output += f"• Active: {slot['active']}\n"
output += f"• Confirmed Flush LSN: {slot['confirmed_flush_lsn']}\n"
output += f"• Lag Size: {slot['lag_size']}\n"
# Add warnings for inactive slots
if not slot['active']:
output += "⚠️ Warning: Inactive replication slot detected!\n"
else:
output += "No replication slots found.\n"
# Format replication connections
output += "\n=== REPLICATION CONNECTIONS ===\n"
if results.get("replication_connections"):
for idx, conn in enumerate(results["replication_connections"], 1):
output += f"\nConnection #{idx}:\n"
output += f"• PID: {conn['pid']}\n"
output += f"• Username: {conn['usename']}\n"
output += f"• Application: {conn['application_name']}\n"
output += f"• Client Address: {conn['client_addr']}\n"
output += f"• Client Hostname: {conn['client_hostname']}\n"
output += f"• Client Port: {conn['client_port']}\n"
output += f"• Backend Start: {conn['backend_start']}\n"
output += f"• State: {conn['state']}\n"
output += f"• Sent LSN: {conn['sent_lsn']}\n"
output += f"• Write LSN: {conn['write_lsn']}\n"
output += f"• Flush LSN: {conn['flush_lsn']}\n"
output += f"• Replay LSN: {conn['replay_lsn']}\n"
output += f"• Lag Size: {conn['lag_bytes']} bytes\n"
# Add warnings for large lag
if conn['lag_bytes'] > 100000000: # 100MB
output += "⚠️ Warning: Large replication lag detected!\n"
else:
output += "No replication connections found.\n"
return output
def execute_system_health(secret_name):
"""Execute system health-related analysis queries"""
queries = {
"database_statistics": """
SELECT datname,
numbackends,
xact_commit,
xact_rollback,
blks_read,
blks_hit,
tup_returned,
tup_fetched,
tup_inserted,
tup_updated,
tup_deleted,
conflicts,
temp_files,
temp_bytes,
deadlocks,
blk_read_time,
blk_write_time,
stats_reset
FROM pg_stat_database
WHERE datname = current_database();
""",
"lock_contention": """
SELECT locktype,
CASE
WHEN relation IS NOT NULL THEN relation::regclass::text
ELSE 'NULL'
END as relation,
mode,
transactionid as tid,
virtualtransaction as vtid,
pid,
granted
FROM pg_locks
ORDER BY relation;
""",
"long_running_transactions": """
SELECT pid,
usename,
datname,
age(now(), xact_start) as xact_age,
state,
query
FROM pg_stat_activity
WHERE state != 'idle'
AND xact_start < now() - interval '5 minutes'
ORDER BY xact_start;
"""
}
conn = connect_to_db(secret_name)
try:
# First, ensure pg_stat_statements is installed
with conn.cursor() as cur:
cur.execute("""
CREATE EXTENSION IF NOT EXISTS pg_stat_statements;
""")
conn.commit()
results = {}
# Execute each query and collect results
for query_name, query in queries.items():
try:
with conn.cursor() as cur:
cur.execute(query)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
results[query_name] = [dict(zip(columns, row)) for row in rows]
except Exception as e:
print(f"Error executing {query_name}: {str(e)}")
results[query_name] = []
return results
except Exception as e:
raise Exception(f"Failed to retrieve system health metrics: {str(e)}")
finally:
if conn:
conn.close()
def format_results_for_system_health(results):
"""Format system health analysis results in a human-readable string"""
output = "Database System Health Report\n\n"
# Format database statistics
output += "=== DATABASE STATISTICS ===\n"
if results.get("database_statistics"):
for stat in results["database_statistics"]:
output += f"Database: {stat['datname']}\n"
output += f"• Active Connections: {stat['numbackends']}\n"
output += f"• Transactions Committed: {stat['xact_commit']}\n"
output += f"• Transactions Rolled Back: {stat['xact_rollback']}\n"
output += f"• Blocks Read: {stat['blks_read']}\n"
output += f"• Blocks Hit (Cache): {stat['blks_hit']}\n"
# Calculate cache hit ratio
total_blocks = stat['blks_read'] + stat['blks_hit']
if total_blocks > 0:
cache_hit_ratio = (stat['blks_hit'] / total_blocks) * 100
output += f"• Cache Hit Ratio: {cache_hit_ratio:.2f}%\n"
if cache_hit_ratio < 90:
output += "⚠️ Warning: Low cache hit ratio. Consider increasing shared_buffers.\n"
output += f"• Tuples Returned: {stat['tup_returned']}\n"
output += f"• Tuples Fetched: {stat['tup_fetched']}\n"
output += f"• Tuples Inserted: {stat['tup_inserted']}\n"
output += f"• Tuples Updated: {stat['tup_updated']}\n"
output += f"• Tuples Deleted: {stat['tup_deleted']}\n"
output += f"• Conflicts: {stat['conflicts']}\n"
output += f"• Temporary Files Created: {stat['temp_files']}\n"
output += f"• Temporary Bytes Written: {stat['temp_bytes']}\n"
output += f"• Deadlocks: {stat['deadlocks']}\n"
output += f"• Block Read Time: {stat['blk_read_time']} ms\n"
output += f"• Block Write Time: {stat['blk_write_time']} ms\n"
output += f"• Statistics Reset: {stat['stats_reset']}\n"
# Add warnings for concerning metrics
if stat['deadlocks'] > 0:
output += "⚠️ Warning: Deadlocks detected!\n"
if stat['conflicts'] > 0:
output += "⚠️ Warning: Conflicts detected!\n"
if stat['temp_files'] > 1000:
output += "⚠️ Warning: High number of temporary files created!\n"
else:
output += "No database statistics available.\n"
# Format lock contention
output += "\n=== LOCK CONTENTION ===\n"
if results.get("lock_contention"):
lock_groups = {}
for lock in results["lock_contention"]:
relation = lock['relation']
if relation not in lock_groups:
lock_groups[relation] = []
lock_groups[relation].append(lock)
for relation, locks in lock_groups.items():
output += f"\nRelation: {relation}\n"
for idx, lock in enumerate(locks, 1):
output += f"Lock #{idx}:\n"
output += f"• Type: {lock['locktype']}\n"
output += f"• Mode: {lock['mode']}\n"
output += f"• Transaction ID: {lock['tid']}\n"
output += f"• Virtual Transaction ID: {lock['vtid']}\n"
output += f"• PID: {lock['pid']}\n"
output += f"• Granted: {lock['granted']}\n"
if not lock['granted']:
output += "⚠️ Warning: Lock waiting to be granted!\n"
else:
output += "No lock contention found.\n"
# Format long-running transactions
output += "\n=== LONG-RUNNING TRANSACTIONS (> 5 minutes) ===\n"
if results.get("long_running_transactions"):
for idx, txn in enumerate(results["long_running_transactions"], 1):
output += f"\nTransaction #{idx}:\n"
output += f"• PID: {txn['pid']}\n"
output += f"• Username: {txn['usename']}\n"
output += f"• Database: {txn['datname']}\n"
output += f"• Age: {txn['xact_age']}\n"
output += f"• State: {txn['state']}\n"
output += f"• Query: {txn['query']}\n"
# Add warning for very long-running transactions
if 'hours' in str(txn['xact_age']) or 'days' in str(txn['xact_age']):
output += "⚠️ Warning: Transaction running for an extended period!\n"
else:
output += "No long-running transactions found.\n"
return output
def connect_to_db(secret_name):
"""Establish database connection"""
cur_secret = secret_name
secret = get_secret(cur_secret)
print(secret)
try:
print("in connect_to_db")
conn = psycopg2.connect(
host=secret['host'],
database=secret['dbname'],
user=secret['username'],
password=secret['password'],
port=secret['port']
)
return conn
except Exception as e:
raise Exception(f"Failed to connect to the database: {str(e)}")
def get_env_secret(environment):
ssm_client = boto3.client('ssm')
print("in get_env_secret")
"""Retrieve the secret name for the specified environment"""
if environment == 'prod':
print("in get_env_secret1")
try:
# Get the secret name from Parameter Store
print("in get_env_secret-try")
response = ssm_client.get_parameter(
Name=f'/AuroraOpsGPT/{environment}'
)
print(response['Parameter']['Value'])
return response['Parameter']['Value']
except ssm_client.exceptions.ParameterNotFound:
error_message = f"Parameter not found: {parameter_name}"
print(error_message)
raise Exception(error_message)
elif environment == 'dev':
try:
# Get the secret name from Parameter Store
response = ssm_client.get_parameter(
Name=f'/AuroraOpsGPT/{environment}'
)
return response['Parameter']['Value']
except Exception as e:
raise Exception(f"Failed to get dev secret name from Parameter Store: {str(e)}")
else:
print("environement does not exist")
raise ValueError(f"Unknown environment: {environment}")
def lambda_handler(event, context):
try:
print(event)
environment = event['environment']
action_type = event['action_type']
print("Environment: {}".format(environment))
secret_name = get_env_secret(environment)
min_exec_time = 1000
print(event)
# Get slow queries
#if tool_name == 'slow_query':
if action_type == 'slow_query':
print("Executing slow query scripts")
results = execute_slow_query(secret_name, min_exec_time)
# Format results for Bedrock Agent
formatted_output = format_results_for_slow_query(results)
print(formatted_output)
elif action_type == 'connection_management_issues':
print("Executing connection_management_issues")
results = execute_connect_issues(secret_name, min_exec_time)
# Format results for Bedrock Agent
formatted_output = format_results_for_conn_issues(results)
elif action_type == 'index_analysis':
print("Executing index_analysis")
results = execute_index_analysis(secret_name)
formatted_output = format_results_for_index_analysis(results)
elif action_type == 'autovacuum_analysis':
print("Executing autovacuum_analysis")
results = execute_autovacuum_analysis(secret_name)
formatted_output = format_results_for_autovacuum_analysis(results)
elif action_type == 'io_analysis':
print("Executing io_analysis")
results = execute_io_analysis(secret_name)
formatted_output = format_results_for_io_analysis(results)
elif action_type == 'replication_analysis':
print("Executing replication_analysis")
results = execute_replication_analysis(secret_name)
formatted_output = format_results_for_replication_analysis(results)
elif action_type == 'system_health':
print("Executing system_health")
results = execute_system_health(secret_name)
formatted_output = format_results_for_system_health(results)
else:
return {
"functionResponse": {
"content": f"Error: Unknown "
}
}
response_body = {
'TEXT': {
'body': formatted_output
}
}
function_response = {
'functionResponse': {
'responseBody': response_body
}
}
return function_response
except Exception as e:
return json.dumps({
"functionResponse": {
"content": f"Error analyzing slow queries: {str(e)}"
}
})