|
@@ -37,14 +37,17 @@ def escape_value(value):
|
|
|
return f"'{escaped}'"
|
|
|
|
|
|
|
|
|
-def generate_insert_sql(table_name, row):
|
|
|
+def generate_insert_sql(table_name, row, skip_fields=None):
|
|
|
"""
|
|
|
生成单行INSERT语句
|
|
|
"""
|
|
|
+ skip_fields = skip_fields or []
|
|
|
columns = []
|
|
|
values = []
|
|
|
|
|
|
for col_name, value in row.items():
|
|
|
+ if col_name in skip_fields:
|
|
|
+ continue # 跳过指定字段
|
|
|
columns.append(f"`{col_name}`")
|
|
|
values.append(escape_value(value))
|
|
|
|
|
@@ -53,7 +56,7 @@ def generate_insert_sql(table_name, row):
|
|
|
return f"INSERT INTO `{table_name}` ({cols_str}) VALUES ({vals_str});"
|
|
|
|
|
|
|
|
|
-def export_table(table_name, sql_query, export_file):
|
|
|
+def export_table(table_name, sql_query, export_file, skip_fields=None):
|
|
|
"""
|
|
|
导出表数据到SQL文件
|
|
|
"""
|
|
@@ -65,10 +68,11 @@ def export_table(table_name, sql_query, export_file):
|
|
|
cursor.execute(sql_query)
|
|
|
column_names = [col[0] for col in cursor.description]
|
|
|
|
|
|
- with open(export_file, 'w', encoding='utf-8') as f:
|
|
|
- batch_size = 1000
|
|
|
- count = 0
|
|
|
+ batch_size = 1000
|
|
|
+ count = 0
|
|
|
+ buffer = []
|
|
|
|
|
|
+ with open(export_file, 'w', encoding='utf-8') as f:
|
|
|
while True:
|
|
|
rows = cursor.fetchmany(batch_size)
|
|
|
if not rows:
|
|
@@ -76,12 +80,23 @@ def export_table(table_name, sql_query, export_file):
|
|
|
|
|
|
for row in rows:
|
|
|
row_dict = dict(zip(column_names, row))
|
|
|
- f.write(generate_insert_sql(table_name, row_dict) + '\n')
|
|
|
+ buffer.append(generate_insert_sql(table_name, row_dict, skip_fields) + '\n')
|
|
|
count += 1
|
|
|
|
|
|
+ # 批量写入
|
|
|
+ if len(buffer) >= 100:
|
|
|
+ f.writelines(buffer)
|
|
|
+ buffer.clear()
|
|
|
+
|
|
|
log.info(f"已导出 {count} 行数据到 {table_name}")
|
|
|
|
|
|
+ # 写入剩余内容
|
|
|
+ if buffer:
|
|
|
+ f.writelines(buffer)
|
|
|
+
|
|
|
log.info(f"表 {table_name} 导出完成,共 {count} 行数据")
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"导出表 {table_name} 时发生错误: {str(e)}")
|
|
|
finally:
|
|
|
connection.close()
|
|
|
|
|
@@ -124,6 +139,8 @@ def export_tables():
|
|
|
't_yujin_crossborder_prov_region_trade'
|
|
|
]
|
|
|
|
|
|
+ skip_fields = {'id'}
|
|
|
+
|
|
|
# 导出按年份月份的表(最新月数据)
|
|
|
for table in month_tables:
|
|
|
try:
|
|
@@ -131,7 +148,7 @@ def export_tables():
|
|
|
if latest_month:
|
|
|
export_file = EXPORT_DIR / f"{table}_{latest_month}.sql"
|
|
|
sql_query = f"SELECT * FROM `{table}` WHERE `year_month` = '{latest_month}'"
|
|
|
- export_table(table, sql_query, export_file)
|
|
|
+ export_table(table, sql_query, export_file, skip_fields)
|
|
|
else:
|
|
|
log.warning(f"表 {table} 未找到数据,跳过导出")
|
|
|
except Exception as e:
|
|
@@ -141,7 +158,7 @@ def export_tables():
|
|
|
for table in full_tables:
|
|
|
try:
|
|
|
export_file = EXPORT_DIR / f"{table}_{latest_month}.sql"
|
|
|
- export_table(table, f"SELECT * FROM `{table}`", export_file)
|
|
|
+ export_table(table, f"SELECT * FROM `{table}`", export_file, skip_fields)
|
|
|
except Exception as e:
|
|
|
log.error(f"导出表 {table} 时出错: {str(e)}")
|
|
|
|
|
@@ -152,7 +169,7 @@ def export_tables():
|
|
|
if latest_month:
|
|
|
export_file = EXPORT_DIR / f"{table}_{latest_month}.sql"
|
|
|
sql_query = f"SELECT * FROM `{table}` WHERE `crossborder_year_month` = '{latest_month}'"
|
|
|
- export_table(table, sql_query, export_file)
|
|
|
+ export_table(table, sql_query, export_file, skip_fields)
|
|
|
else:
|
|
|
log.warning(f"表 {table} 未找到数据,跳过导出")
|
|
|
except Exception as e:
|