mirror of
https://gitcode.com/huyuchengus/gitcode_knowledge.git
synced 2025-08-13 03:13:27 +00:00
1 line
14 KiB
Markdown
1 line
14 KiB
Markdown
<p>想做一个简单的数据库表的统计工具。</p> <br><p>用 AI,感觉整个人脑子都不好用了。</p> <br><p></p> <br><pre><code>import pyodbc<br>import pandas as pd<br>import matplotlib.pyplot as plt<br>import seaborn as sns<br>from datetime import datetime<br>import warnings<br>warnings.filterwarnings('ignore')<br><br>class MSSQLTableAnalyzer:<br> def __init__(self, server, database, username=None, password=None, trusted_connection=True):<br> """<br> Initialize MSSQL connection<br> <br> Args:<br> server: SQL Server name or IP<br> database: Database name<br> username: Username (optional if using Windows Authentication)<br> password: Password (optional if using Windows Authentication)<br> trusted_connection: Use Windows Authentication (default: True)<br> """<br> self.server = server<br> self.database = database<br> self.username = username<br> self.password = password<br> self.trusted_connection = trusted_connection<br> self.connection = None<br> <br> def connect(self):<br> """Establish connection to MSSQL database"""<br> try:<br> if self.trusted_connection:<br> # Windows Authentication<br> connection_string = f"""<br> DRIVER={<!-- -->{ODBC Driver 17 for SQL Server}};<br> SERVER={self.server};<br> DATABASE={self.database};<br> Trusted_Connection=yes;<br> """<br> else:<br> # SQL Server Authentication<br> connection_string = f"""<br> DRIVER={<!-- -->{ODBC Driver 17 for SQL Server}};<br> SERVER={self.server};<br> DATABASE={self.database};<br> UID={self.username};<br> PWD={self.password};<br> """<br> <br> self.connection = pyodbc.connect(connection_string)<br> print(f"✅ Successfully connected to {self.database} on {self.server}")<br> return True<br> <br> except Exception as e:<br> print(f"❌ Connection failed: {str(e)}")<br> return False<br> <br> def get_table_counts(self):<br> """Query all tables and their row counts"""<br> if not self.connection:<br> print("❌ No database connection established")<br> return None<br> <br> try:<br> # Query to get all user tables and their row counts<br> query = """<br> SELECT <br> t.TABLE_SCHEMA as [Schema],<br> t.TABLE_NAME as [Table_Name],<br> p.rows as [Row_Count]<br> FROM <br> INFORMATION_SCHEMA.TABLES t<br> INNER JOIN <br> sys.tables st ON st.name = t.TABLE_NAME<br> INNER JOIN <br> sys.partitions p ON st.object_id = p.object_id<br> WHERE <br> t.TABLE_TYPE = 'BASE TABLE'<br> AND p.index_id < 2<br> ORDER BY <br> p.rows DESC, t.TABLE_SCHEMA, t.TABLE_NAME<br> """<br> <br> df = pd.read_sql_query(query, self.connection)<br> print(f"📊 Found {len(df)} tables in database")<br> return df<br> <br> except Exception as e:<br> print(f"❌ Query failed: {str(e)}")<br> return None<br> <br> def create_bar_chart(self, df, max_tables=20, chart_type='horizontal'):<br> """<br> Create bar chart for table row counts<br> <br> Args:<br> df: DataFrame with table information<br> max_tables: Maximum number of tables to display<br> chart_type: 'horizontal' or 'vertical'<br> """<br> if df is None or df.empty:<br> print("❌ No data to display")<br> return<br> <br> # Prepare data for visualization<br> df_display = df.head(max_tables).copy()<br> df_display['Full_Name'] = df_display['Schema'] + '.' + df_display['Table_Name']<br> <br> # Set up the plot style<br> plt.style.use('seaborn-v0_8')<br> fig, ax = plt.subplots(figsize=(12, 8))<br> <br> # Create color palette<br> colors = plt.cm.viridis(range(len(df_display)))<br> <br> if chart_type == 'horizontal':<br> bars = ax.barh(df_display['Full_Name'], df_display['Row_Count'], color=colors)<br> ax.set_xlabel('Number of Rows', fontsize=12, fontweight='bold')<br> ax.set_ylabel('Table Name', fontsize=12, fontweight='bold')<br> <br> # Add value labels on bars<br> for i, bar in enumerate(bars):<br> width = bar.get_width()<br> ax.text(width + max(df_display['Row_Count']) * 0.01, <br> bar.get_y() + bar.get_height()/2, <br> f'{int(width):,}', <br> ha='left', va='center', fontsize=10)<br> else:<br> bars = ax.bar(range(len(df_display)), df_display['Row_Count'], color=colors)<br> ax.set_xlabel('Table Name', fontsize=12, fontweight='bold')<br> ax.set_ylabel('Number of Rows', fontsize=12, fontweight='bold')<br> ax.set_xticks(range(len(df_display)))<br> ax.set_xticklabels(df_display['Full_Name'], rotation=45, ha='right')<br> <br> # Add value labels on bars<br> for i, bar in enumerate(bars):<br> height = bar.get_height()<br> ax.text(bar.get_x() + bar.get_width()/2, height + max(df_display['Row_Count']) * 0.01,<br> f'{int(height):,}', <br> ha='center', va='bottom', fontsize=10)<br> <br> # Customize the chart<br> ax.set_title(f'Database Table Row Counts - {self.database}\n'<br> f'Top {min(max_tables, len(df))} Tables by Row Count', <br> fontsize=14, fontweight='bold', pad=20)<br> <br> # Format y-axis to show numbers with commas<br> ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{int(x):,}'))<br> if chart_type == 'vertical':<br> ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{int(x):,}'))<br> else:<br> ax.xaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'{int(x):,}'))<br> <br> # Add grid for better readability<br> ax.grid(True, alpha=0.3, axis='x' if chart_type == 'horizontal' else 'y')<br> <br> # Add timestamp<br> timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")<br> plt.figtext(0.99, 0.01, f'Generated: {timestamp}', <br> ha='right', va='bottom', fontsize=8, alpha=0.7)<br> <br> plt.tight_layout()<br> plt.show()<br> <br> # Print summary statistics<br> total_rows = df['Row_Count'].sum()<br> avg_rows = df['Row_Count'].mean()<br> print(f"\n📈 Summary Statistics:")<br> print(f" Total Tables: {len(df):,}")<br> print(f" Total Rows: {total_rows:,}")<br> print(f" Average Rows per Table: {avg_rows:,.0f}")<br> print(f" Largest Table: {df.iloc[0]['Schema']}.{df.iloc[0]['Table_Name']} ({df.iloc[0]['Row_Count']:,} rows)")<br> <br> def get_schema_summary(self, df):<br> """Create summary by schema"""<br> if df is None or df.empty:<br> return None<br> <br> schema_summary = df.groupby('Schema').agg({<br> 'Table_Name': 'count',<br> 'Row_Count': ['sum', 'mean', 'max']<br> }).round(0)<br> <br> schema_summary.columns = ['Table_Count', 'Total_Rows', 'Avg_Rows', 'Max_Rows']<br> schema_summary = schema_summary.sort_values('Total_Rows', ascending=False)<br> <br> return schema_summary<br> <br> def create_schema_chart(self, df):<br> """Create bar chart grouped by schema"""<br> schema_summary = self.get_schema_summary(df)<br> if schema_summary is None:<br> return<br> <br> fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))<br> <br> # Chart 1: Total rows by schema<br> colors1 = plt.cm.Set3(range(len(schema_summary)))<br> bars1 = ax1.bar(schema_summary.index, schema_summary['Total_Rows'], color=colors1)<br> ax1.set_title('Total Rows by Schema', fontsize=12, fontweight='bold')<br> ax1.set_xlabel('Schema', fontsize=10)<br> ax1.set_ylabel('Total Rows', fontsize=10)<br> ax1.tick_params(axis='x', rotation=45)<br> <br> # Add value labels<br> for bar in bars1:<br> height = bar.get_height()<br> ax1.text(bar.get_x() + bar.get_width()/2, height,<br> f'{int(height):,}', ha='center', va='bottom', fontsize=9)<br> <br> # Chart 2: Table count by schema<br> colors2 = plt.cm.Set2(range(len(schema_summary)))<br> bars2 = ax2.bar(schema_summary.index, schema_summary['Table_Count'], color=colors2)<br> ax2.set_title('Table Count by Schema', fontsize=12, fontweight='bold')<br> ax2.set_xlabel('Schema', fontsize=10)<br> ax2.set_ylabel('Number of Tables', fontsize=10)<br> ax2.tick_params(axis='x', rotation=45)<br> <br> # Add value labels<br> for bar in bars2:<br> height = bar.get_height()<br> ax2.text(bar.get_x() + bar.get_width()/2, height,<br> f'{int(height)}', ha='center', va='bottom', fontsize=9)<br> <br> plt.tight_layout()<br> plt.show()<br> <br> print("\n📊 Schema Summary:")<br> print(schema_summary.to_string())<br> <br> def close_connection(self):<br> """Close database connection"""<br> if self.connection:<br> self.connection.close()<br> print("🔐 Database connection closed")<br><br>def main():<br> """Main execution function"""<br> # Database connection parameters<br> # Modify these parameters according to your environment<br> SERVER = "localhost" # or your server name/IP<br> DATABASE = "YourDatabaseName" # replace with your database name<br> <br> # Initialize analyzer<br> analyzer = MSSQLTableAnalyzer(<br> server=SERVER,<br> database=DATABASE,<br> trusted_connection=True # Set to False if using SQL Server Authentication<br> )<br> <br> # Connect to database<br> if not analyzer.connect():<br> return<br> <br> try:<br> # Get table counts<br> print("🔍 Querying table information...")<br> df = analyzer.get_table_counts()<br> <br> if df is not None and not df.empty:<br> # Create main bar chart<br> print("📊 Creating bar chart...")<br> analyzer.create_bar_chart(df, max_tables=15, chart_type='horizontal')<br> <br> # Create schema summary chart<br> print("📈 Creating schema summary...")<br> analyzer.create_schema_chart(df)<br> <br> # Export to CSV (optional)<br> csv_filename = f"table_counts_{analyzer.database}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"<br> df.to_csv(csv_filename, index=False)<br> print(f"💾 Data exported to: {csv_filename}")<br> <br> else:<br> print("❌ No table data retrieved")<br> <br> except Exception as e:<br> print(f"❌ Error during execution: {str(e)}")<br> <br> finally:<br> # Always close the connection<br> analyzer.close_connection()<br><br>if __name__ == "__main__":<br> # Example usage<br> print("🚀 MSSQL Table Count Analyzer")<br> print("=" * 40)<br> <br> # You can also use the analyzer directly:<br> # analyzer = MSSQLTableAnalyzer("your_server", "your_database")<br> # if analyzer.connect():<br> # df = analyzer.get_table_counts()<br> # analyzer.create_bar_chart(df)<br> # analyzer.close_connection()<br> <br> main()<br></code></pre> <br><p></p> <br><p></p> <br><p class="img-center"><a href="https://cdn.isharkfly.com/com-isharkfly-www/discourse-uploads/original/3X/4/a/4a8210305150bbe76d01edc7e611bcbde1fb19b2.png" rel="nofollow"><img alt="2025-06-11_08-01-35" height="354" src="https://i-blog.csdnimg.cn/img_convert/8a970fd2ba57e7aa604b6e1fcc75bef2.png" width="690" /></a></p> <br><p></p> <br><p></p> <br><p>虽然离要求还是有点距离,但要用的几个库和数据库连接都没啥问题。</p> <br><p></p> <br><p></p> <br><p class="img-center"><a href="https://cdn.isharkfly.com/com-isharkfly-www/discourse-uploads/original/3X/5/e/5e0ed27cda86dbb650f6805160da8fbebb5a8a0d.jpeg" rel="nofollow"><img alt="2025-06-11_07-59-25" height="499" src="https://i-blog.csdnimg.cn/img_convert/af9cff9271df41c5317735c8049fc3df.jpeg" width="689" /></a></p> <br><p></p> <br><p>生成的图表也还能看。</p> <br><p></p> <br><p><a class="has-card" href="https://www.isharkfly.com/t/ai/17423" rel="nofollow" title="使用 AI,代码自己都少写了不少 - Python - iSharkFly"><span class="link-card-box" contenteditable="false"><span class="link-title">使用 AI,代码自己都少写了不少 - Python - iSharkFly</span><span class="link-desc">想做一个简单的数据库表的统计工具。 用 AI,感觉整个人脑子都不好用了。 import pyodbcimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsfrom datetime import datetimeimport warningswarnings.filterwarnings('ignore')class MSSQLT…</span><span class="link-link"><img class="link-link-icon" src="https://csdnimg.cn/release/blog_editor_html/release2.3.9/ckeditor/plugins/CsdnLink/icons/icon-default.png?t=P4F5" />https://www.isharkfly.com/t/ai/17423</span></span></a></p> |