mirror of
https://github.com/awslabs/amazon-bedrock-agentcore-samples.git
synced 2025-09-08 20:50:46 +00:00
1592 lines
58 KiB
Python
1592 lines
58 KiB
Python
import json
|
|
import boto3
|
|
import psycopg2
|
|
import os
|
|
import re
|
|
import time
|
|
import logging
|
|
from datetime import datetime
|
|
from botocore.exceptions import ClientError
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class QueryComplexityError(Exception):
|
|
"""Custom exception for query complexity violations"""
|
|
pass
|
|
|
|
class QueryLimitError(Exception):
|
|
"""Custom exception for query limit violations"""
|
|
pass
|
|
|
|
def analyze_query_complexity(query):
|
|
"""
|
|
Analyze query complexity and potential resource impact
|
|
|
|
Args:
|
|
query (str): SQL query to analyze
|
|
|
|
Returns:
|
|
dict: Complexity metrics
|
|
|
|
Raises:
|
|
QueryComplexityError: If query is too complex
|
|
"""
|
|
query_lower = query.lower()
|
|
complexity_score = 0
|
|
warnings = []
|
|
|
|
# Check for joins
|
|
join_count = sum(1 for join_type in ['join', 'inner join', 'left join', 'right join', 'full join']
|
|
if join_type in query_lower)
|
|
complexity_score += join_count * 2
|
|
if join_count > 3:
|
|
warnings.append(f"Query contains {join_count} joins - consider simplifying")
|
|
|
|
# Check for subqueries
|
|
subquery_count = query_lower.count('(select')
|
|
complexity_score += subquery_count * 3
|
|
if subquery_count > 2:
|
|
warnings.append(f"Query contains {subquery_count} subqueries - consider restructuring")
|
|
|
|
# Check for aggregations
|
|
agg_functions = ['count(', 'sum(', 'avg(', 'max(', 'min(']
|
|
agg_count = sum(query_lower.count(func) for func in agg_functions)
|
|
complexity_score += agg_count
|
|
|
|
# Check for window functions
|
|
if 'over(' in query_lower or 'partition by' in query_lower:
|
|
complexity_score += 3
|
|
warnings.append("Query uses window functions - monitor performance")
|
|
|
|
# Check for complex WHERE conditions
|
|
where_pos = query_lower.find('where')
|
|
if where_pos != -1:
|
|
where_clause = query_lower[where_pos:]
|
|
and_count = where_clause.count(' and ')
|
|
or_count = where_clause.count(' or ')
|
|
complexity_score += (and_count + or_count)
|
|
if (and_count + or_count) > 5:
|
|
warnings.append(f"Complex WHERE clause with {and_count + or_count} conditions")
|
|
|
|
return {
|
|
'complexity_score': complexity_score,
|
|
'warnings': warnings,
|
|
'join_count': join_count,
|
|
'subquery_count': subquery_count,
|
|
'aggregation_count': agg_count
|
|
}
|
|
|
|
def validate_and_execute_queries(secret_name, query, max_rows=20,
|
|
max_statements=5, max_total_rows=1000,
|
|
max_complexity=15):
|
|
"""
|
|
Enhanced query validation and execution with additional controls
|
|
"""
|
|
response = {
|
|
'results': [],
|
|
'performance_metrics': None,
|
|
'warnings': [],
|
|
'optimization_suggestions': []
|
|
}
|
|
|
|
start_time = time.time()
|
|
conn = None
|
|
total_rows = 0
|
|
|
|
try:
|
|
# Validate and split queries
|
|
statements = validate_query(query)
|
|
|
|
# Check number of statements
|
|
if len(statements) > max_statements:
|
|
raise QueryLimitError(
|
|
f"Too many statements ({len(statements)}). Maximum allowed is {max_statements}"
|
|
)
|
|
|
|
# Connect to database
|
|
conn = connect_to_db(secret_name)
|
|
|
|
with conn.cursor() as cur:
|
|
# Set session parameters
|
|
cur.execute("SET TRANSACTION READ ONLY")
|
|
cur.execute("SET statement_timeout = '30s'")
|
|
cur.execute("SET idle_in_transaction_session_timeout = '60s'")
|
|
|
|
# Execute each statement
|
|
for stmt_index, stmt in enumerate(statements, 1):
|
|
# Analyze query complexity
|
|
complexity_metrics = analyze_query_complexity(stmt)
|
|
if complexity_metrics['complexity_score'] > max_complexity:
|
|
raise QueryComplexityError(
|
|
f"Statement {stmt_index} is too complex (score: {complexity_metrics['complexity_score']})"
|
|
)
|
|
|
|
# Add complexity warnings to response
|
|
response['warnings'].extend(
|
|
f"Statement {stmt_index}: {warning}"
|
|
for warning in complexity_metrics['warnings']
|
|
)
|
|
|
|
stmt_response = {
|
|
'columns': [],
|
|
'rows': [],
|
|
'truncated': False,
|
|
'message': '',
|
|
'row_count': 0,
|
|
'query': stmt,
|
|
'complexity_metrics': complexity_metrics
|
|
}
|
|
|
|
stmt_lower = stmt.lower().strip()
|
|
|
|
# Only add LIMIT for SELECT queries
|
|
if stmt_lower.startswith('select') and 'limit' not in stmt_lower:
|
|
remaining_rows = max_total_rows - total_rows
|
|
limit_rows = min(max_rows, remaining_rows)
|
|
stmt = f"{stmt} LIMIT {limit_rows + 1}"
|
|
|
|
# Execute with explain plan first for SELECT queries
|
|
if stmt_lower.startswith('select'):
|
|
#cur.execute(f"EXPLAIN (FORMAT JSON) {stmt}")
|
|
#explain_plan = cur.fetchone()[0]
|
|
|
|
# Analyze plan for potential issues
|
|
optimization_suggestions = analyze_query_performance(secret_name, stmt)
|
|
if optimization_suggestions:
|
|
response['optimization_suggestions'].extend(
|
|
f"Statement {stmt_index}: {suggestion}"
|
|
for suggestion in optimization_suggestions
|
|
)
|
|
|
|
# Execute actual query
|
|
cur.execute(stmt)
|
|
|
|
# Get column names
|
|
stmt_response['columns'] = [desc[0] for desc in cur.description]
|
|
|
|
# Fetch results
|
|
rows = cur.fetchall()
|
|
row_count = len(rows)
|
|
|
|
# Check total row limit
|
|
total_rows += row_count
|
|
if total_rows > max_total_rows:
|
|
stmt_response['truncated'] = True
|
|
excess_rows = total_rows - max_total_rows
|
|
rows = rows[:-excess_rows]
|
|
stmt_response['message'] = (
|
|
f"Results truncated. Maximum total rows ({max_total_rows}) reached"
|
|
)
|
|
total_rows = max_total_rows
|
|
|
|
# Check individual statement limit
|
|
elif row_count > max_rows:
|
|
stmt_response['truncated'] = True
|
|
rows = rows[:max_rows]
|
|
stmt_response['message'] = (
|
|
f"Results truncated to {max_rows} rows"
|
|
)
|
|
|
|
stmt_response['row_count'] = len(rows)
|
|
stmt_response['rows'] = [
|
|
dict(zip(stmt_response['columns'], row))
|
|
for row in rows
|
|
]
|
|
|
|
response['results'].append(stmt_response)
|
|
|
|
# Add overall performance metrics
|
|
total_time = time.time() - start_time
|
|
response['performance_metrics'] = {
|
|
'execution_time': total_time,
|
|
'statements_executed': len(statements),
|
|
'total_rows': total_rows,
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'needs_analysis': total_time > 5,
|
|
'performance_message': (
|
|
f"Executed {len(statements)} statements in {total_time:.2f} seconds"
|
|
)
|
|
}
|
|
|
|
# Add performance recommendations if needed
|
|
if total_time > 5:
|
|
response['warnings'].append(
|
|
"Query execution time exceeded 5 seconds. Consider optimization."
|
|
)
|
|
|
|
return response
|
|
|
|
except (QueryComplexityError, QueryLimitError) as e:
|
|
error_msg = str(e)
|
|
logger.warning(error_msg)
|
|
raise ValueError(error_msg)
|
|
|
|
except psycopg2.Error as pe:
|
|
error_msg = f"Database error: {str(pe)}"
|
|
logger.error(error_msg)
|
|
raise Exception(error_msg)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error: {str(e)}"
|
|
logger.error(error_msg)
|
|
raise Exception(error_msg)
|
|
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def get_secret(secret_name):
|
|
"""Get secret from AWS Secrets Manager """
|
|
secret_name = secret_name
|
|
region_name = os.environ['REGION']
|
|
session = boto3.session.Session()
|
|
client = session.client(
|
|
service_name='secretsmanager',
|
|
region_name=region_name
|
|
)
|
|
|
|
try:
|
|
secret_value = client.get_secret_value(SecretId=secret_name)
|
|
secret = json.loads(secret_value['SecretString'])
|
|
return secret
|
|
except ClientError as e:
|
|
raise Exception(f"Failed to get secret: {str(e)}")
|
|
|
|
def get_env_secret(environment):
|
|
ssm_client = boto3.client('ssm')
|
|
"""Retrieve the secret name for the specified environment"""
|
|
if environment == 'prod':
|
|
try:
|
|
# Get the secret name from Parameter Store
|
|
response = ssm_client.get_parameter(
|
|
Name=f'/AuroraOps/{environment}'
|
|
)
|
|
print(response['Parameter']['Value'])
|
|
return response['Parameter']['Value']
|
|
except ssm_client.exceptions.ParameterNotFound:
|
|
error_message = f"Parameter not found: {parameter_name}"
|
|
print(error_message)
|
|
raise Exception(error_message)
|
|
elif environment == 'dev':
|
|
try:
|
|
# Get the secret name from Parameter Store
|
|
response = ssm_client.get_parameter(
|
|
Name=f'/AuroraOps/{environment}'
|
|
)
|
|
return response['Parameter']['Value']
|
|
except Exception as e:
|
|
raise Exception(f"Failed to get dev secret name from Parameter Store: {str(e)}")
|
|
else:
|
|
print("environement does not exist")
|
|
raise ValueError(f"Unknown environment: {environment}")
|
|
|
|
def connect_to_db(secret_name):
|
|
"""Establish database connection"""
|
|
cur_secret = secret_name
|
|
secret = get_secret(cur_secret)
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=secret['host'],
|
|
database=secret['dbname'],
|
|
user=secret['username'],
|
|
password=secret['password'],
|
|
port=secret['port']
|
|
)
|
|
return conn
|
|
except Exception as e:
|
|
raise Exception(f"Failed to connect to the database: {str(e)}")
|
|
|
|
# Define the queries dictionary for different object types
|
|
queries = {
|
|
'table': """
|
|
WITH RECURSIVE columns AS (
|
|
SELECT
|
|
t.schemaname,
|
|
t.tablename,
|
|
array_to_string(
|
|
array_agg(
|
|
' ' || quote_ident(a.attname) || ' ' ||
|
|
pg_catalog.format_type(a.atttypid, a.atttypmod) ||
|
|
CASE WHEN a.attnotnull THEN ' NOT NULL' ELSE '' END ||
|
|
CASE WHEN ad.adbin IS NOT NULL
|
|
THEN ' DEFAULT ' || pg_get_expr(ad.adbin, ad.adrelid)
|
|
ELSE ''
|
|
END
|
|
ORDER BY a.attnum
|
|
),
|
|
E',\n'
|
|
) as column_definitions
|
|
FROM pg_catalog.pg_tables t
|
|
JOIN pg_catalog.pg_class c
|
|
ON c.relname = t.tablename
|
|
AND c.relnamespace = (
|
|
SELECT oid
|
|
FROM pg_catalog.pg_namespace
|
|
WHERE nspname = t.schemaname
|
|
)
|
|
JOIN pg_catalog.pg_attribute a
|
|
ON a.attrelid = c.oid
|
|
AND a.attnum > 0
|
|
AND NOT a.attisdropped
|
|
LEFT JOIN pg_catalog.pg_attrdef ad
|
|
ON ad.adrelid = c.oid
|
|
AND ad.adnum = a.attnum
|
|
WHERE t.schemaname NOT IN ('pg_catalog', 'information_schema')
|
|
GROUP BY t.schemaname, t.tablename
|
|
)
|
|
SELECT
|
|
col.schemaname || '.' || col.tablename as object_name,
|
|
'TABLE' as object_type,
|
|
CASE
|
|
WHEN col.column_definitions IS NOT NULL THEN
|
|
format(
|
|
'CREATE TABLE %I.%I (\n%s\n);',
|
|
col.schemaname,
|
|
col.tablename,
|
|
col.column_definitions
|
|
)
|
|
ELSE 'ERROR: No columns found for this table'
|
|
END as definition,
|
|
obj_description(
|
|
(col.schemaname || '.' || col.tablename)::regclass,
|
|
'pg_class'
|
|
) as description
|
|
FROM columns col
|
|
WHERE col.tablename ILIKE %s AND col.schemaname = %s
|
|
""",
|
|
|
|
'view': """
|
|
SELECT
|
|
schemaname || '.' || viewname as object_name,
|
|
'VIEW' as object_type,
|
|
format(
|
|
'CREATE OR REPLACE VIEW %I.%I AS\n%s',
|
|
schemaname,
|
|
viewname,
|
|
pg_get_viewdef(format('%I.%I', schemaname, viewname)::regclass, true)
|
|
) as definition,
|
|
obj_description(
|
|
(schemaname || '.' || viewname)::regclass,
|
|
'pg_class'
|
|
) as description
|
|
FROM pg_catalog.pg_views
|
|
WHERE viewname ILIKE %s
|
|
AND schemaname = %s
|
|
AND schemaname NOT IN ('pg_catalog', 'information_schema')
|
|
""",
|
|
|
|
'function': """
|
|
SELECT
|
|
n.nspname || '.' || p.proname as object_name,
|
|
'FUNCTION' as object_type,
|
|
pg_get_functiondef(p.oid) as definition,
|
|
obj_description(p.oid, 'pg_proc') as description,
|
|
p.prorettype::regtype as return_type,
|
|
p.provolatile,
|
|
p.proparallel
|
|
FROM pg_catalog.pg_proc p
|
|
JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
|
|
WHERE p.proname ILIKE %s
|
|
AND n.nspname = %s
|
|
AND n.nspname NOT IN ('pg_catalog', 'information_schema')
|
|
AND p.prokind = 'f' -- 'f' for function
|
|
""",
|
|
|
|
'procedure': """
|
|
SELECT
|
|
n.nspname || '.' || p.proname as object_name,
|
|
'PROCEDURE' as object_type,
|
|
pg_get_functiondef(p.oid) as definition,
|
|
obj_description(p.oid, 'pg_proc') as description
|
|
FROM pg_catalog.pg_proc p
|
|
JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
|
|
WHERE p.proname ILIKE %s
|
|
AND n.nspname = %s
|
|
AND n.nspname NOT IN ('pg_catalog', 'information_schema')
|
|
AND p.prokind = 'p' -- 'p' for procedure
|
|
""",
|
|
|
|
'trigger': """
|
|
SELECT
|
|
n.nspname || '.' || t.tgname as object_name,
|
|
'TRIGGER' as object_type,
|
|
pg_get_triggerdef(t.oid, true) as definition,
|
|
obj_description(t.oid, 'pg_trigger') as description
|
|
FROM pg_catalog.pg_trigger t
|
|
JOIN pg_catalog.pg_class c ON t.tgrelid = c.oid
|
|
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
|
|
WHERE t.tgname ILIKE %s
|
|
AND n.nspname = %s
|
|
AND n.nspname NOT IN ('pg_catalog', 'information_schema')
|
|
AND NOT t.tgisinternal
|
|
""",
|
|
|
|
'sequence': """
|
|
SELECT
|
|
n.nspname || '.' || c.relname as object_name,
|
|
'SEQUENCE' as object_type,
|
|
format(
|
|
'CREATE SEQUENCE %I.%I\n INCREMENT %s\n MINVALUE %s\n MAXVALUE %s\n START %s\n CACHE %s%s;',
|
|
n.nspname,
|
|
c.relname,
|
|
s.seqincrement,
|
|
s.seqmin,
|
|
s.seqmax,
|
|
s.seqstart,
|
|
s.seqcache,
|
|
CASE WHEN s.seqcycle THEN '\n CYCLE' ELSE '' END
|
|
) as definition,
|
|
obj_description(c.oid, 'pg_class') as description
|
|
FROM pg_catalog.pg_sequence s
|
|
JOIN pg_catalog.pg_class c ON s.seqrelid = c.oid
|
|
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
|
|
WHERE c.relname ILIKE %s
|
|
AND n.nspname = %s
|
|
AND n.nspname NOT IN ('pg_catalog', 'information_schema')
|
|
""",
|
|
|
|
'index': """
|
|
SELECT
|
|
n.nspname || '.' || c.relname as object_name,
|
|
'INDEX' as object_type,
|
|
pg_get_indexdef(i.indexrelid) as definition,
|
|
obj_description(i.indexrelid, 'pg_class') as description
|
|
FROM pg_catalog.pg_index i
|
|
JOIN pg_catalog.pg_class c ON i.indexrelid = c.oid
|
|
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
|
|
WHERE c.relname ILIKE %s
|
|
AND n.nspname = %s
|
|
AND n.nspname NOT IN ('pg_catalog', 'information_schema')
|
|
"""
|
|
}
|
|
|
|
def extract_database_object_ddl(secret_name, object_type, object_name=None, object_schema=None):
|
|
"""
|
|
Extract DDL and description for database objects
|
|
|
|
Args:
|
|
secret_name (str): The name of the secret containing database credentials
|
|
object_type (str): Type of database object ('table', 'view', 'function', 'procedure', etc.)
|
|
object_name (str, optional): Name of the object to search for
|
|
object_schema (str, optional): Schema name to filter objects
|
|
|
|
Returns:
|
|
list: List of dictionaries containing object information
|
|
str: Error message if no objects found
|
|
"""
|
|
try:
|
|
# Input validation
|
|
if not object_name or not object_schema:
|
|
raise ValueError("Both object_name and object_schema are required")
|
|
|
|
# Validate object_type
|
|
object_type_lower = object_type.lower()
|
|
if object_type_lower not in queries:
|
|
valid_types = ', '.join(queries.keys())
|
|
raise ValueError(f"Invalid object_type: {object_type}. Valid types are: {valid_types}")
|
|
|
|
# Connect to database
|
|
conn = connect_to_db(secret_name)
|
|
if not conn:
|
|
raise Exception("Failed to establish database connection")
|
|
|
|
results = []
|
|
with conn.cursor() as cur:
|
|
try:
|
|
# Debug information
|
|
print(f"\nExecuting query for {object_type_lower}")
|
|
print(f"Object name: {object_name}")
|
|
print(f"Schema: {object_schema}")
|
|
|
|
# Get query and execute
|
|
query = queries[object_type_lower]
|
|
params = [object_name, object_schema]
|
|
|
|
print("\nExecuting query with parameters:")
|
|
print(f"Query: {query}")
|
|
print(f"Parameters: {params}")
|
|
|
|
cur.execute(query, params)
|
|
|
|
# Fetch results
|
|
rows = cur.fetchall()
|
|
if not rows:
|
|
print("No rows returned from query")
|
|
return "No matching objects found"
|
|
|
|
# Get column names
|
|
columns = [desc[0] for desc in cur.description]
|
|
print(f"\nColumns returned: {columns}")
|
|
print(f"Number of rows: {len(rows)}")
|
|
|
|
# Process results
|
|
for row in rows:
|
|
if not row:
|
|
continue
|
|
|
|
# Create result dictionary
|
|
result = {}
|
|
for i, column in enumerate(columns):
|
|
result[column] = row[i] if i < len(row) else None
|
|
|
|
# Add explanation based on object type
|
|
if result.get('definition'):
|
|
if object_type_lower == 'table':
|
|
result['explanation'] = analyze_table_definition(result['definition'])
|
|
elif object_type_lower == 'view':
|
|
result['explanation'] = analyze_view_definition(result['definition'])
|
|
elif object_type_lower in ('function', 'procedure'):
|
|
result['explanation'] = analyze_routine_definition(result['definition'])
|
|
elif object_type_lower == 'trigger':
|
|
result['explanation'] = analyze_trigger_definition(result['definition'])
|
|
else:
|
|
result['explanation'] = f"DDL for {object_type_lower}"
|
|
|
|
results.append(result)
|
|
print(f"\nProcessed {object_type_lower}: {result.get('object_name', 'unknown')}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error executing query: {str(e)}"
|
|
print(f"\nError details:")
|
|
print(f"Query: {query}")
|
|
print(f"Parameters: {params}")
|
|
print(f"Error message: {error_msg}")
|
|
raise
|
|
|
|
# Return results
|
|
if not results:
|
|
return "No matching objects found"
|
|
|
|
print(f"\nSuccessfully retrieved {len(results)} objects")
|
|
return results
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to extract database object DDL: {str(e)}"
|
|
print(f"\nError: {error_msg}")
|
|
raise
|
|
|
|
finally:
|
|
if conn:
|
|
try:
|
|
conn.close()
|
|
print("\nDatabase connection closed")
|
|
except Exception as e:
|
|
print(f"\nError closing connection: {str(e)}")
|
|
|
|
|
|
def analyze_table_definition(definition):
|
|
"""Analyze table DDL and return explanatory notes"""
|
|
explanation = ["This table contains the following structure:"]
|
|
|
|
# Extract column definitions
|
|
columns = []
|
|
for line in definition.split('\n'):
|
|
if 'CREATE TABLE' not in line and '(' not in line and ')' not in line:
|
|
if line.strip():
|
|
columns.append(line.strip())
|
|
|
|
# Analyze columns
|
|
for column in columns:
|
|
if column.endswith(','):
|
|
column = column[:-1]
|
|
parts = column.split()
|
|
if len(parts) >= 2:
|
|
col_name = parts[0]
|
|
col_type = parts[1]
|
|
constraints = ' '.join(parts[2:])
|
|
explanation.append(f"- {col_name}: {col_type} {constraints}")
|
|
|
|
return '\n'.join(explanation)
|
|
|
|
def generate_object_explanation(obj_info):
|
|
"""
|
|
Generate a human-readable explanation of the database object
|
|
|
|
Args:
|
|
obj_info (dict): Dictionary containing object information
|
|
|
|
Returns:
|
|
str: Human-readable explanation of the object
|
|
"""
|
|
try:
|
|
definition = obj_info.get('definition', '')
|
|
obj_type = obj_info.get('object_type', '')
|
|
explanation = []
|
|
|
|
# Add object description if available
|
|
description = obj_info.get('description', '')
|
|
if description:
|
|
explanation.append(f"Description: {description}")
|
|
|
|
# Analyze based on object type
|
|
if obj_type == 'TABLE':
|
|
explanation.extend(analyze_table_definition(definition))
|
|
elif obj_type == 'VIEW':
|
|
explanation.extend(analyze_view_definition(definition))
|
|
elif obj_type in ('FUNCTION', 'PROCEDURE'):
|
|
explanation.extend(analyze_routine_definition(definition))
|
|
|
|
return '\n'.join(explanation) if explanation else "No explanation available"
|
|
|
|
except Exception as e:
|
|
print(f"Error generating explanation: {str(e)}")
|
|
return "Error generating explanation"
|
|
|
|
|
|
def analyze_view_definition(definition):
|
|
"""Analyze view DDL and return explanatory notes"""
|
|
explanation = ["This view represents the following:"]
|
|
|
|
# Clean up the definition
|
|
clean_def = definition.replace('\n', ' ').strip()
|
|
|
|
# Extract main components
|
|
if 'SELECT' in clean_def:
|
|
explanation.append("This view performs a SELECT operation with the following characteristics:")
|
|
|
|
# Analyze SELECT clause
|
|
if 'JOIN' in clean_def:
|
|
explanation.append("- Joins multiple tables")
|
|
if 'WHERE' in clean_def:
|
|
explanation.append("- Applies filtering conditions")
|
|
if 'GROUP BY' in clean_def:
|
|
explanation.append("- Aggregates data")
|
|
if 'HAVING' in clean_def:
|
|
explanation.append("- Applies post-aggregation filters")
|
|
if 'ORDER BY' in clean_def:
|
|
explanation.append("- Sorts the results")
|
|
if 'UNION' in clean_def:
|
|
explanation.append("- Combines multiple result sets")
|
|
if 'WITH' in clean_def:
|
|
explanation.append("- Uses Common Table Expressions (CTEs)")
|
|
|
|
return '\n'.join(explanation)
|
|
|
|
def analyze_routine_definition(definition):
|
|
"""Analyze function/procedure DDL and return explanatory notes"""
|
|
explanation = []
|
|
|
|
# Determine if it's a function or procedure
|
|
if 'FUNCTION' in definition:
|
|
explanation.append("This is a function that:")
|
|
else:
|
|
explanation.append("This is a procedure that:")
|
|
|
|
# Extract parameters
|
|
if '(' in definition:
|
|
param_section = definition[definition.find('(')+1:definition.find(')')]
|
|
params = param_section.split(',')
|
|
if params and params[0].strip():
|
|
explanation.append("\nParameters:")
|
|
for param in params:
|
|
explanation.append(f"- {param.strip()}")
|
|
|
|
# Identify return type for functions
|
|
if 'RETURNS' in definition:
|
|
return_type = definition[definition.find('RETURNS')+7:].split()[0]
|
|
explanation.append(f"\nReturns: {return_type}")
|
|
|
|
# Analyze body
|
|
if 'BEGIN' in definition:
|
|
explanation.append("\nLogic overview:")
|
|
if 'IF' in definition:
|
|
explanation.append("- Contains conditional logic")
|
|
if 'LOOP' in definition or 'WHILE' in definition:
|
|
explanation.append("- Contains loops")
|
|
if 'INSERT' in definition:
|
|
explanation.append("- Performs data insertion")
|
|
if 'UPDATE' in definition:
|
|
explanation.append("- Performs data updates")
|
|
if 'DELETE' in definition:
|
|
explanation.append("- Performs data deletion")
|
|
if 'SELECT' in definition:
|
|
explanation.append("- Retrieves data")
|
|
if 'EXCEPTION' in definition:
|
|
explanation.append("- Includes error handling")
|
|
|
|
return '\n'.join(explanation)
|
|
|
|
def analyze_trigger_definition(definition):
|
|
"""Analyze trigger DDL and return explanatory notes"""
|
|
explanation = ["This trigger:"]
|
|
|
|
if 'BEFORE' in definition:
|
|
explanation.append("- Executes BEFORE the event")
|
|
elif 'AFTER' in definition:
|
|
explanation.append("- Executes AFTER the event")
|
|
|
|
if 'INSERT' in definition:
|
|
explanation.append("- Fires on INSERT")
|
|
if 'UPDATE' in definition:
|
|
explanation.append("- Fires on UPDATE")
|
|
if 'DELETE' in definition:
|
|
explanation.append("- Fires on DELETE")
|
|
|
|
if 'FOR EACH ROW' in definition:
|
|
explanation.append("- Executes for each affected row")
|
|
elif 'FOR EACH STATEMENT' in definition:
|
|
explanation.append("- Executes once per statement")
|
|
|
|
return '\n'.join(explanation)
|
|
|
|
|
|
def clean_query_for_explain(query):
|
|
"""
|
|
Remove any existing EXPLAIN or EXPLAIN ANALYZE keywords from the query
|
|
|
|
Parameters:
|
|
- query: Original query string
|
|
Returns:
|
|
- Cleaned query string
|
|
"""
|
|
# Remove common EXPLAIN variants (case-insensitive)
|
|
patterns = [
|
|
r'^\s*EXPLAIN\s+ANALYZE\s+',
|
|
r'^\s*EXPLAIN\s+\(.*?\)\s+',
|
|
r'^\s*EXPLAIN\s+'
|
|
]
|
|
|
|
cleaned_query = query
|
|
for pattern in patterns:
|
|
cleaned_query = re.sub(pattern, '', cleaned_query, flags=re.IGNORECASE)
|
|
|
|
return cleaned_query.strip()
|
|
|
|
def analyze_query_performance(secret_name, query_or_object_name, parameters=None, object_type=None):
|
|
"""
|
|
Analyze query performance and provide optimization recommendations
|
|
|
|
Parameters:
|
|
- secret_name: Secret containing database credentials
|
|
- query_or_object_name: SQL query string or object name to analyze
|
|
- parameters: Optional. List of parameter values for parameterized queries
|
|
- object_type: Optional. If provided, will fetch definition from database object
|
|
"""
|
|
conn = connect_to_db(secret_name)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
# If object_type is provided, fetch the query definition
|
|
if object_type:
|
|
query_to_analyze = get_object_definition(cur, query_or_object_name, object_type)
|
|
else:
|
|
query_to_analyze = query_or_object_name
|
|
|
|
# Clean the query before analysis
|
|
query_to_analyze = clean_query_for_explain(query_to_analyze)
|
|
|
|
# Check if the query contains parameter placeholders
|
|
has_parameters = any(f'${i}' in query_to_analyze for i in range(1, 21))
|
|
|
|
if has_parameters:
|
|
# Replace $n parameters with dummy placeholders
|
|
modified_query = query_to_analyze
|
|
param_count = 0
|
|
for i in range(1, 21):
|
|
if f'${i}' in modified_query:
|
|
param_count = max(param_count, i)
|
|
modified_query = modified_query.replace(f'${i}', 'NULL')
|
|
|
|
# Use GENERIC_PLAN for parameterized queries
|
|
cur.execute(f"EXPLAIN (GENERIC_PLAN, BUFFERS, FORMAT JSON) {modified_query}")
|
|
plan = cur.fetchone()[0]
|
|
|
|
cur.execute(f"EXPLAIN (FORMAT JSON) {modified_query}")
|
|
estimated_plan = cur.fetchone()[0]
|
|
|
|
# Pass True for is_generic_plan
|
|
analysis = analyze_execution_plan(plan[0], estimated_plan[0], True)
|
|
else:
|
|
# For non-parameterized queries, use ANALYZE
|
|
cur.execute(f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query_to_analyze}")
|
|
plan = cur.fetchone()[0]
|
|
|
|
cur.execute(f"EXPLAIN (FORMAT JSON) {query_to_analyze}")
|
|
estimated_plan = cur.fetchone()[0]
|
|
|
|
# Pass False for is_generic_plan
|
|
analysis = analyze_execution_plan(plan[0], estimated_plan[0], False)
|
|
|
|
return analysis
|
|
|
|
except Exception as e:
|
|
raise Exception(f"Failed to analyze query performance: {str(e)}")
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def analyze_execution_plan(actual_plan, estimated_plan, is_generic_plan):
|
|
"""
|
|
Analyze execution plan and provide detailed explanations and recommendations
|
|
"""
|
|
analysis = {
|
|
'summary': [],
|
|
'issues': [],
|
|
'recommendations': [],
|
|
'performance_stats': {}
|
|
}
|
|
|
|
analysis['plan_type'] = 'Generic Plan' if is_generic_plan else 'Analyzed Plan'
|
|
|
|
# Extract key performance metrics
|
|
total_cost = actual_plan['Plan'].get('Total Cost')
|
|
rows = actual_plan['Plan'].get('Plan Rows') # Use Plan Rows for generic plans
|
|
estimated_rows = estimated_plan['Plan'].get('Plan Rows')
|
|
|
|
# Initialize performance stats
|
|
performance_stats = {
|
|
'total_cost': total_cost,
|
|
'estimated_rows': estimated_rows,
|
|
'plan_rows': rows
|
|
}
|
|
|
|
# Add actual execution metrics only for analyzed plans
|
|
if not is_generic_plan:
|
|
actual_time = actual_plan['Plan'].get('Actual Total Time')
|
|
actual_rows = actual_plan['Plan'].get('Actual Rows')
|
|
performance_stats.update({
|
|
'execution_time_ms': actual_time,
|
|
'actual_rows': actual_rows
|
|
})
|
|
|
|
analysis['performance_stats'] = performance_stats
|
|
|
|
# Analyze node types and operations
|
|
analyze_plan_node(actual_plan['Plan'], analysis, is_generic_plan)
|
|
|
|
# Check for common issues
|
|
identify_performance_issues(actual_plan['Plan'], estimated_plan['Plan'], analysis, is_generic_plan)
|
|
|
|
# Generate recommendations
|
|
generate_recommendations(analysis)
|
|
|
|
return analysis
|
|
|
|
def analyze_plan_node(node, analysis, is_generic_plan):
|
|
"""
|
|
Recursively analyze each node in the execution plan
|
|
"""
|
|
# Analyze current node
|
|
node_type = node['Node Type']
|
|
|
|
# Check for expensive operations with appropriate metrics based on plan type
|
|
if node_type == 'Seq Scan':
|
|
analysis['issues'].append({
|
|
'type': 'sequential_scan',
|
|
'description': f"Sequential scan detected on table {node.get('Relation Name')}",
|
|
'severity': 'high'
|
|
})
|
|
|
|
elif node_type == 'Nested Loop':
|
|
if is_generic_plan:
|
|
if node.get('Plan Rows', 0) > 1000:
|
|
analysis['issues'].append({
|
|
'type': 'nested_loop_large_dataset',
|
|
'description': "Nested loop join planned for large dataset",
|
|
'severity': 'medium'
|
|
})
|
|
else:
|
|
if node.get('Actual Rows', 0) > 1000:
|
|
analysis['issues'].append({
|
|
'type': 'nested_loop_large_dataset',
|
|
'description': "Nested loop join performed on large dataset",
|
|
'severity': 'medium'
|
|
})
|
|
|
|
elif node_type == 'Hash Join':
|
|
rows_metric = node.get('Plan Rows' if is_generic_plan else 'Actual Rows', 0)
|
|
if node.get('Hash Cond') and rows_metric > 10000:
|
|
analysis['issues'].append({
|
|
'type': 'large_hash_join',
|
|
'description': "Large hash join operation detected",
|
|
'severity': 'medium'
|
|
})
|
|
|
|
# Check for filter conditions
|
|
if 'Filter' in node:
|
|
analyze_filter_condition(node['Filter'], analysis)
|
|
|
|
# Recursively analyze child nodes
|
|
for child in node.get('Plans', []):
|
|
analyze_plan_node(child, analysis, is_generic_plan)
|
|
|
|
def analyze_filter_condition(filter_condition, analysis):
|
|
"""
|
|
Analyze filter conditions for potential optimization opportunities
|
|
"""
|
|
filter_lower = filter_condition.lower()
|
|
|
|
# Check for function calls in WHERE clause
|
|
if '(' in filter_condition and ')' in filter_condition:
|
|
analysis['issues'].append({
|
|
'type': 'function_in_filter',
|
|
'description': "Function call in WHERE clause may prevent index usage",
|
|
'severity': 'medium'
|
|
})
|
|
|
|
# Check for LIKE operations
|
|
if ' like ' in filter_lower and filter_lower.startswith('%'):
|
|
analysis['issues'].append({
|
|
'type': 'leading_wildcard',
|
|
'description': "Leading wildcard in LIKE clause prevents index usage",
|
|
'severity': 'medium'
|
|
})
|
|
|
|
def identify_performance_issues(actual_node, estimated_node, analysis, is_generic_plan):
|
|
"""
|
|
Identify performance issues by comparing actual vs estimated plans
|
|
"""
|
|
if not is_generic_plan:
|
|
# Row estimation analysis only for actual execution plans
|
|
if estimated_node.get('Plan Rows', 0) > 0:
|
|
estimation_ratio = actual_node.get('Actual Rows', 0) / estimated_node['Plan Rows']
|
|
if estimation_ratio > 10 or estimation_ratio < 0.1:
|
|
analysis['issues'].append({
|
|
'type': 'poor_statistics',
|
|
'description': f"Statistics may be outdated - row estimation is off by factor of {estimation_ratio:.1f}",
|
|
'severity': 'high'
|
|
})
|
|
|
|
# Parallel execution analysis (applies to both plan types)
|
|
if actual_node.get('Workers Planned', 0) > 0 and actual_node.get('Workers Launched', 0) == 0:
|
|
analysis['issues'].append({
|
|
'type': 'parallel_execution_failed',
|
|
'description': "Parallel execution was planned but not executed",
|
|
'severity': 'medium'
|
|
})
|
|
|
|
def generate_recommendations(analysis):
|
|
"""
|
|
Generate specific recommendations based on identified issues
|
|
"""
|
|
for issue in analysis['issues']:
|
|
if issue['type'] == 'sequential_scan':
|
|
analysis['recommendations'].append({
|
|
'issue': 'Sequential Scan Detected',
|
|
'recommendation': """
|
|
Consider the following solutions:
|
|
1. Create an index on the commonly queried columns
|
|
2. Review WHERE clause conditions for index compatibility
|
|
3. Ensure statistics are up to date with ANALYZE
|
|
|
|
Example index creation:
|
|
CREATE INDEX idx_name ON table_name (column_name);
|
|
"""
|
|
})
|
|
|
|
elif issue['type'] == 'poor_statistics':
|
|
analysis['recommendations'].append({
|
|
'issue': 'Statistics Mismatch',
|
|
'recommendation': """
|
|
Update statistics for more accurate query planning:
|
|
1. Run ANALYZE on the affected tables
|
|
2. Consider increasing statistics target:
|
|
ALTER TABLE table_name ALTER COLUMN column_name SET STATISTICS 1000;
|
|
3. Review and possibly update auto_vacuum settings
|
|
"""
|
|
})
|
|
|
|
elif issue['type'] == 'function_in_filter':
|
|
analysis['recommendations'].append({
|
|
'issue': 'Function in WHERE Clause',
|
|
'recommendation': """
|
|
Optimize filter conditions:
|
|
1. Remove function calls from WHERE clause
|
|
2. Consider creating a computed column with an index
|
|
3. Rewrite the condition to use direct column comparisons
|
|
|
|
Example:
|
|
Instead of: WHERE UPPER(column) = 'VALUE'
|
|
Use: WHERE column = LOWER('VALUE')
|
|
"""
|
|
})
|
|
|
|
def format_analysis_output(analysis):
|
|
"""
|
|
Format the analysis results into human-readable text
|
|
"""
|
|
output = []
|
|
|
|
# Performance Statistics
|
|
output.append("Query Performance Summary:")
|
|
|
|
# Check if this is a generic plan
|
|
is_generic_plan = analysis.get('plan_type') == 'Generic Plan'
|
|
|
|
if is_generic_plan:
|
|
# For generic plans, show estimated metrics
|
|
output.append(f"- Plan Type: Generic Plan (Parameterized Query)")
|
|
output.append(f"- Estimated Total Cost: {analysis['performance_stats'].get('total_cost', 'N/A')}")
|
|
output.append(f"- Estimated Rows: {analysis['performance_stats'].get('estimated_rows', 'N/A')}")
|
|
output.append(f"- Plan Rows: {analysis['performance_stats'].get('plan_rows', 'N/A')}")
|
|
else:
|
|
# For actual execution plans, show actual metrics
|
|
output.append(f"- Plan Type: Analyzed Plan")
|
|
output.append(f"- Execution Time: {analysis['performance_stats'].get('execution_time_ms', 'N/A'):.2f} ms")
|
|
output.append(f"- Actual Rows: {analysis['performance_stats'].get('actual_rows', 'N/A')}")
|
|
output.append(f"- Estimated Rows: {analysis['performance_stats'].get('estimated_rows', 'N/A')}")
|
|
|
|
output.append("")
|
|
|
|
# Issues
|
|
if analysis['issues']:
|
|
output.append("Identified Issues:")
|
|
for issue in analysis['issues']:
|
|
output.append(f"- {issue['description']} (Severity: {issue['severity']})")
|
|
output.append("")
|
|
|
|
# Recommendations
|
|
if analysis['recommendations']:
|
|
output.append("Recommendations:")
|
|
for rec in analysis['recommendations']:
|
|
output.append(f"Problem: {rec['issue']}")
|
|
output.append(f"Solution: {rec['recommendation']}")
|
|
output.append("")
|
|
|
|
return "\n".join(output)
|
|
|
|
def monitor_query_performance(query, start_time, rows_returned):
|
|
"""
|
|
Monitor query performance and suggest analysis if needed
|
|
|
|
Args:
|
|
query (str): Executed SQL query
|
|
start_time (float): Query start timestamp
|
|
rows_returned (int): Number of rows returned
|
|
|
|
Returns:
|
|
dict: Performance metrics and analysis suggestion
|
|
"""
|
|
execution_time = time.time() - start_time
|
|
metrics = {
|
|
'query': query,
|
|
'execution_time': execution_time,
|
|
'rows_returned': rows_returned,
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'needs_analysis': False,
|
|
'performance_message': ''
|
|
}
|
|
|
|
# Define performance thresholds
|
|
SLOW_QUERY_THRESHOLD = 5 # seconds
|
|
HIGH_ROWS_THRESHOLD = 10000
|
|
|
|
# Check for performance issues
|
|
performance_issues = []
|
|
|
|
if execution_time > SLOW_QUERY_THRESHOLD:
|
|
performance_issues.append(f"Query took {execution_time:.2f} seconds to execute")
|
|
metrics['needs_analysis'] = True
|
|
|
|
if rows_returned > HIGH_ROWS_THRESHOLD:
|
|
performance_issues.append(f"Query returned {rows_returned} rows")
|
|
metrics['needs_analysis'] = True
|
|
|
|
if metrics['needs_analysis']:
|
|
metrics['performance_message'] = (
|
|
"⚠️ Performance Warning:\n"
|
|
f"{'; '.join(performance_issues)}.\n"
|
|
"Would you like me to analyze this query for potential optimizations? "
|
|
"Reply with 'yes' to get performance recommendations."
|
|
)
|
|
logger.warning(f"Slow query detected: {query}")
|
|
else:
|
|
metrics['performance_message'] = f"Query executed successfully in {execution_time:.2f} seconds"
|
|
|
|
return metrics
|
|
|
|
def validate_query(query):
|
|
"""
|
|
Validate query for security concerns and split into statements
|
|
|
|
Args:
|
|
query (str): SQL query to validate
|
|
|
|
Returns:
|
|
list: List of validated statements
|
|
|
|
Raises:
|
|
ValueError: If query contains prohibited operations
|
|
"""
|
|
if not query or not isinstance(query, str):
|
|
raise ValueError("Query must be a non-empty string")
|
|
|
|
def is_within_quotes(text, position):
|
|
"""Check if a position in text is within quotes"""
|
|
single_quotes = False
|
|
double_quotes = False
|
|
for i in range(position):
|
|
if text[i] == "'" and not double_quotes:
|
|
single_quotes = not single_quotes
|
|
elif text[i] == '"' and not single_quotes:
|
|
double_quotes = not double_quotes
|
|
return single_quotes or double_quotes
|
|
|
|
def split_statements(query_text):
|
|
"""Split query into individual statements, respecting quotes and comments"""
|
|
statements = []
|
|
current_stmt = []
|
|
i = 0
|
|
comment_block = False
|
|
line_comment = False
|
|
|
|
while i < len(query_text):
|
|
char = query_text[i]
|
|
|
|
# Handle comment blocks
|
|
if query_text[i:i+2] == '/*' and not line_comment:
|
|
comment_block = True
|
|
current_stmt.append(char)
|
|
i += 1
|
|
elif query_text[i:i+2] == '*/' and comment_block:
|
|
comment_block = False
|
|
current_stmt.append(char)
|
|
i += 1
|
|
# Handle line comments
|
|
elif query_text[i:i+2] == '--' and not comment_block:
|
|
line_comment = True
|
|
current_stmt.append(char)
|
|
i += 1
|
|
elif char == '\n' and line_comment:
|
|
line_comment = False
|
|
current_stmt.append(char)
|
|
# Handle semicolons
|
|
elif char == ';' and not comment_block and not line_comment and not is_within_quotes(query_text, i):
|
|
current_stmt.append(char)
|
|
stmt = ''.join(current_stmt).strip()
|
|
if stmt:
|
|
statements.append(stmt)
|
|
current_stmt = []
|
|
else:
|
|
current_stmt.append(char)
|
|
i += 1
|
|
|
|
# Add the last statement if exists
|
|
last_stmt = ''.join(current_stmt).strip()
|
|
if last_stmt:
|
|
statements.append(last_stmt)
|
|
|
|
return [stmt for stmt in statements if stmt]
|
|
|
|
# Split into statements
|
|
statements = split_statements(query)
|
|
validated_statements = []
|
|
|
|
# Validate each statement
|
|
for stmt in statements:
|
|
stmt = stmt.strip()
|
|
if stmt.endswith(';'):
|
|
stmt = stmt[:-1]
|
|
|
|
stmt_lower = stmt.lower().strip()
|
|
|
|
# Get the command type
|
|
first_word = stmt_lower.split()[0] if stmt_lower.split() else ''
|
|
|
|
if first_word not in ['select', 'show']:
|
|
raise ValueError(f"Prohibited operation detected: {first_word}")
|
|
|
|
# For SELECT statements, check for dangerous operations
|
|
if first_word == 'select':
|
|
dangerous_operations = [
|
|
r'\binsert\b', r'\bupdate\b', r'\bdelete\b', r'\bdrop\b',
|
|
r'\btruncate\b', r'\balter\b', r'\bcreate\b', r'\bgrant\b',
|
|
r'\brevoke\b', r'\bexecute\b', r'\bcopy\b'
|
|
]
|
|
|
|
# Remove content within quotes for checking
|
|
query_for_check = ''
|
|
in_quote = False
|
|
quote_char = None
|
|
|
|
for char in stmt:
|
|
if char in ["'", '"'] and (not quote_char or char == quote_char):
|
|
if not in_quote:
|
|
quote_char = char
|
|
in_quote = True
|
|
else:
|
|
quote_char = None
|
|
in_quote = False
|
|
elif not in_quote:
|
|
query_for_check += char
|
|
|
|
# Check for dangerous operations
|
|
for operation in dangerous_operations:
|
|
if re.search(operation, query_for_check.lower()):
|
|
raise ValueError(f"Statement contains prohibited operation: {operation}")
|
|
|
|
validated_statements.append(stmt)
|
|
|
|
return validated_statements
|
|
|
|
def execute_read_query(secret_name, query, max_rows=20):
|
|
"""
|
|
Execute read-only queries safely and return results with monitoring
|
|
|
|
Args:
|
|
secret_name (str): Secret containing database credentials
|
|
query (str): SQL query to execute
|
|
max_rows (int): Maximum number of rows to return (only for SELECT queries)
|
|
|
|
Returns:
|
|
dict: Query results and metadata
|
|
"""
|
|
|
|
response = {
|
|
'results': [],
|
|
'performance_metrics': None,
|
|
'warnings': [],
|
|
'optimization_suggestions': []
|
|
}
|
|
|
|
start_time = time.time()
|
|
conn = None
|
|
|
|
try:
|
|
# Validate and split queries
|
|
statements = validate_query(query)
|
|
|
|
# Connect to database
|
|
conn = connect_to_db(secret_name)
|
|
|
|
with conn.cursor() as cur:
|
|
# Set session to read-only and timeout
|
|
cur.execute("SET TRANSACTION READ ONLY")
|
|
cur.execute("SET statement_timeout = '30s'")
|
|
|
|
# Execute each statement
|
|
for stmt_index, stmt in enumerate(statements, 1):
|
|
stmt_response = {
|
|
'columns': [],
|
|
'rows': [],
|
|
'truncated': False,
|
|
'message': '',
|
|
'row_count': 0,
|
|
'query': stmt
|
|
}
|
|
|
|
# Determine if it's a SELECT query
|
|
stmt_lower = stmt.lower().strip()
|
|
is_select_query = stmt_lower.lstrip('(').startswith('select')
|
|
|
|
# Prepare the final query
|
|
final_query = stmt
|
|
if is_select_query and 'limit' not in stmt_lower:
|
|
final_query = f"{stmt} LIMIT {max_rows + 1}"
|
|
|
|
# Execute query
|
|
try:
|
|
cur.execute(final_query)
|
|
except psycopg2.Error as pe:
|
|
logger.error(f"Error executing query: {final_query}")
|
|
logger.error(f"Error details: {str(pe)}")
|
|
raise
|
|
|
|
# Get column names
|
|
stmt_response['columns'] = [desc[0] for desc in cur.description]
|
|
|
|
# Fetch results
|
|
rows = cur.fetchall()
|
|
total_rows = len(rows)
|
|
|
|
# Handle row limiting only for SELECT queries
|
|
if is_select_query and total_rows > max_rows:
|
|
stmt_response['truncated'] = True
|
|
rows = rows[:max_rows]
|
|
stmt_response['message'] = (
|
|
f"Results truncated to {max_rows} rows for performance reasons. "
|
|
f"Total rows available: {total_rows}"
|
|
)
|
|
stmt_response['row_count'] = max_rows
|
|
else:
|
|
stmt_response['row_count'] = total_rows
|
|
|
|
# Convert rows to list of dictionaries
|
|
stmt_response['rows'] = [
|
|
dict(zip(stmt_response['columns'], row))
|
|
for row in rows
|
|
]
|
|
|
|
# Add performance monitoring only for SELECT queries
|
|
if is_select_query:
|
|
complexity_metrics = analyze_query_complexity(stmt)
|
|
stmt_response['complexity_metrics'] = complexity_metrics
|
|
|
|
# Add complexity warnings if any
|
|
if complexity_metrics['warnings']:
|
|
response['warnings'].extend(
|
|
f"Statement {stmt_index}: {warning}"
|
|
for warning in complexity_metrics['warnings']
|
|
)
|
|
|
|
# Store the original query in the response
|
|
stmt_response['query'] = f"{stmt};"
|
|
response['results'].append(stmt_response)
|
|
|
|
# Add overall performance metrics
|
|
total_time = time.time() - start_time
|
|
response['performance_metrics'] = {
|
|
'execution_time': total_time,
|
|
'statements_executed': len(statements),
|
|
'timestamp': datetime.utcnow().isoformat(),
|
|
'needs_analysis': total_time > 5,
|
|
'performance_message': (
|
|
f"Executed {len(statements)} statements in {total_time:.2f} seconds"
|
|
)
|
|
}
|
|
|
|
return response
|
|
|
|
except ValueError as ve:
|
|
error_msg = f"Query validation failed: {str(ve)}"
|
|
logger.error(error_msg)
|
|
raise ValueError(error_msg)
|
|
|
|
except psycopg2.Error as pe:
|
|
error_msg = f"Database error: {str(pe)}"
|
|
logger.error(error_msg)
|
|
raise Exception(error_msg)
|
|
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error: {str(e)}"
|
|
logger.error(error_msg)
|
|
raise Exception(error_msg)
|
|
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
def format_enhanced_results(results):
|
|
"""
|
|
Format results with enhanced information
|
|
"""
|
|
formatted_output = []
|
|
|
|
# Add performance summary
|
|
metrics = results['performance_metrics']
|
|
formatted_output.append("Query Execution Summary:")
|
|
formatted_output.append(f"- Total execution time: {metrics['execution_time']:.2f} seconds")
|
|
formatted_output.append(f"- Statements executed: {metrics['statements_executed']}")
|
|
formatted_output.append(f"- Total rows returned: {metrics['total_rows']}")
|
|
formatted_output.append("")
|
|
|
|
# Add warnings if any
|
|
if results['warnings']:
|
|
formatted_output.append("Warnings:")
|
|
for warning in results['warnings']:
|
|
formatted_output.append(f"- {warning}")
|
|
formatted_output.append("")
|
|
|
|
# Add optimization suggestions if any
|
|
if results['optimization_suggestions']:
|
|
formatted_output.append("Optimization Suggestions:")
|
|
for suggestion in results['optimization_suggestions']:
|
|
formatted_output.append(f"- {suggestion}")
|
|
formatted_output.append("")
|
|
|
|
# Format each statement's results
|
|
for i, result in enumerate(results['results'], 1):
|
|
formatted_output.append(f"Statement {i}:")
|
|
formatted_output.append(f"Query: {result['query']}")
|
|
|
|
# Add complexity metrics
|
|
complexity = result['complexity_metrics']
|
|
formatted_output.append("Complexity Analysis:")
|
|
formatted_output.append(f"- Score: {complexity['complexity_score']}")
|
|
formatted_output.append(f"- Joins: {complexity['join_count']}")
|
|
formatted_output.append(f"- Subqueries: {complexity['subquery_count']}")
|
|
formatted_output.append(f"- Aggregations: {complexity['aggregation_count']}")
|
|
|
|
if result['message']:
|
|
formatted_output.append(f"Note: {result['message']}")
|
|
|
|
if result['columns']:
|
|
# Calculate column widths
|
|
widths = {
|
|
col: max(len(str(col)),
|
|
max(len(str(row[col])) for row in result['rows']))
|
|
for col in result['columns']
|
|
}
|
|
|
|
# Add header
|
|
header = " | ".join(
|
|
str(col).ljust(widths[col])
|
|
for col in result['columns']
|
|
)
|
|
formatted_output.append(header)
|
|
formatted_output.append("-" * len(header))
|
|
|
|
# Add rows
|
|
for row in result['rows']:
|
|
formatted_output.append(" | ".join(
|
|
str(row[col]).ljust(widths[col])
|
|
for col in result['columns']
|
|
))
|
|
|
|
formatted_output.append(f"Rows returned: {result['row_count']}")
|
|
formatted_output.append("")
|
|
|
|
return "\n".join(formatted_output)
|
|
|
|
def format_query_results(results):
|
|
"""
|
|
Format query results for display
|
|
|
|
Args:
|
|
results (dict): Query execution results
|
|
|
|
Returns:
|
|
str: Formatted results string
|
|
"""
|
|
formatted_output = []
|
|
|
|
# Add performance message first
|
|
if results['performance_metrics'] and results['performance_metrics']['performance_message']:
|
|
formatted_output.append(results['performance_metrics']['performance_message'] + "\n")
|
|
|
|
# Add truncation message if applicable
|
|
if results['message']:
|
|
formatted_output.append(f"Note: {results['message']}\n")
|
|
|
|
# Add column headers
|
|
if results['columns']:
|
|
# Calculate column widths
|
|
widths = {}
|
|
for col in results['columns']:
|
|
widths[col] = len(str(col))
|
|
for row in results['rows']:
|
|
widths[col] = max(widths[col], len(str(row[col])))
|
|
|
|
# Create header
|
|
header = " | ".join(
|
|
str(col).ljust(widths[col])
|
|
for col in results['columns']
|
|
)
|
|
formatted_output.append(header)
|
|
|
|
# Add separator
|
|
separator = "-" * len(header)
|
|
formatted_output.append(separator)
|
|
|
|
# Add rows
|
|
for row in results['rows']:
|
|
formatted_row = " | ".join(
|
|
str(row[col]).ljust(widths[col])
|
|
for col in results['columns']
|
|
)
|
|
formatted_output.append(formatted_row)
|
|
|
|
# Add summary
|
|
formatted_output.append(f"\nTotal rows: {results['row_count']}")
|
|
|
|
return "\n".join(formatted_output)
|
|
|
|
def format_multi_query_results(results):
|
|
"""Format results from multiple statements"""
|
|
formatted_output = []
|
|
|
|
# Add performance summary
|
|
metrics = results['performance_metrics']
|
|
formatted_output.append(f"Query Execution Summary:")
|
|
formatted_output.append(f"- Total execution time: {metrics['execution_time']:.2f} seconds")
|
|
formatted_output.append(f"- Statements executed: {metrics['statements_executed']}")
|
|
formatted_output.append("")
|
|
|
|
# Format each statement's results
|
|
for i, result in enumerate(results['results'], 1):
|
|
formatted_output.append(f"Statement {i}: {result['query']}")
|
|
if result['message']:
|
|
formatted_output.append(f"Note: {result['message']}")
|
|
|
|
if result['columns']:
|
|
# Calculate column widths
|
|
widths = {
|
|
col: max(len(str(col)),
|
|
max(len(str(row[col])) for row in result['rows']))
|
|
for col in result['columns']
|
|
}
|
|
|
|
# Add header
|
|
header = " | ".join(
|
|
str(col).ljust(widths[col])
|
|
for col in result['columns']
|
|
)
|
|
formatted_output.append(header)
|
|
formatted_output.append("-" * len(header))
|
|
|
|
# Add rows
|
|
for row in result['rows']:
|
|
formatted_output.append(" | ".join(
|
|
str(row[col]).ljust(widths[col])
|
|
for col in result['columns']
|
|
))
|
|
|
|
formatted_output.append(f"Rows returned: {result['row_count']}")
|
|
formatted_output.append("")
|
|
|
|
return "\n".join(formatted_output)
|
|
|
|
def lambda_handler(event, context):
|
|
try:
|
|
environment = event['environment']
|
|
action_type = event['action_type']
|
|
secret_name = get_env_secret(environment)
|
|
min_exec_time = 1000
|
|
|
|
# Get explain plan for a query
|
|
if action_type == 'explain_query':
|
|
query = event['query']
|
|
print("Executing explain query scripts")
|
|
results = analyze_query_performance(secret_name, query)
|
|
formatted_results = format_analysis_output(results)
|
|
elif action_type == 'extract_ddl':
|
|
object_type = event['object_type']
|
|
object_name = event['object_name']
|
|
object_schema = event['object_schema']
|
|
print("Generating the DDL scripts for the object")
|
|
results = extract_database_object_ddl(secret_name, object_type=object_type, object_name=object_name, object_schema=object_schema)
|
|
# Convert results to string if it's not already
|
|
formatted_results = str(results) if results else "No results found"
|
|
elif action_type == 'execute_query':
|
|
query = event['query']
|
|
print("Executing read-only queries")
|
|
results = validate_and_execute_queries(
|
|
secret_name,
|
|
query,
|
|
max_rows=20,
|
|
max_statements=5,
|
|
max_total_rows=1000,
|
|
max_complexity=15
|
|
)
|
|
formatted_results = format_enhanced_results(results)
|
|
else:
|
|
print("I'm inside else condition")
|
|
return {
|
|
"functionResponse": {
|
|
"content": f"Error: Unknown function {function}"
|
|
}
|
|
}
|
|
|
|
# Format the response properly
|
|
response_body = {
|
|
'TEXT': {
|
|
'body': formatted_results
|
|
}
|
|
}
|
|
|
|
function_response = {
|
|
'functionResponse': {
|
|
'responseBody': response_body
|
|
}
|
|
}
|
|
|
|
return function_response
|
|
|
|
except Exception as e:
|
|
print(f"Error in lambda_handler: {str(e)}") # Add debugging
|
|
return {
|
|
"functionResponse": {
|
|
"content": f"Error inside the exception block: {str(e)}"
|
|
}
|
|
} |