|
@@ -0,0 +1,164 @@
|
|
|
+import datetime
|
|
|
+import os
|
|
|
+from decimal import Decimal
|
|
|
+
|
|
|
+from sqlalchemy import text
|
|
|
+
|
|
|
+from crossborder.utils.constants import DOWNLOAD_DIR
|
|
|
+from crossborder.utils.db_helper import DBHelper
|
|
|
+from crossborder.utils.log import get_logger
|
|
|
+
|
|
|
+log = get_logger(__name__)
|
|
|
+
|
|
|
+# 定义导出目录
|
|
|
+EXPORT_DIR = DOWNLOAD_DIR / "export"
|
|
|
+os.makedirs(EXPORT_DIR, exist_ok=True)
|
|
|
+
|
|
|
+
|
|
|
+def escape_value(value):
|
|
|
+ """
|
|
|
+ 安全转义SQL值,处理各种数据类型
|
|
|
+ """
|
|
|
+ if value is None:
|
|
|
+ return "NULL"
|
|
|
+ elif isinstance(value, (int, float)):
|
|
|
+ return str(value)
|
|
|
+ elif isinstance(value, Decimal):
|
|
|
+ return format(value, 'f')
|
|
|
+ elif isinstance(value, datetime.date):
|
|
|
+ return f"'{value.strftime('%Y-%m-%d')}'"
|
|
|
+ elif isinstance(value, datetime.datetime):
|
|
|
+ return f"'{value.strftime('%Y-%m-%d %H:%M:%S')}'"
|
|
|
+ elif isinstance(value, bytes):
|
|
|
+ return f"0x{value.hex()}" # 二进制类型转十六进制
|
|
|
+ else:
|
|
|
+ # 转义特殊字符
|
|
|
+ escaped = str(value).replace("\\", "\\\\").replace("'", "''")
|
|
|
+ return f"'{escaped}'"
|
|
|
+
|
|
|
+
|
|
|
+def generate_insert_sql(table_name, row):
|
|
|
+ """
|
|
|
+ 生成单行INSERT语句
|
|
|
+ """
|
|
|
+ columns = []
|
|
|
+ values = []
|
|
|
+
|
|
|
+ for col_name, value in row.items():
|
|
|
+ columns.append(f"`{col_name}`")
|
|
|
+ values.append(escape_value(value))
|
|
|
+
|
|
|
+ cols_str = ", ".join(columns)
|
|
|
+ vals_str = ", ".join(values)
|
|
|
+ return f"INSERT INTO `{table_name}` ({cols_str}) VALUES ({vals_str});"
|
|
|
+
|
|
|
+
|
|
|
+def export_table(table_name, sql_query, export_file):
|
|
|
+ """
|
|
|
+ 导出表数据到SQL文件
|
|
|
+ """
|
|
|
+ db_helper = DBHelper()
|
|
|
+ connection = db_helper.engine.raw_connection()
|
|
|
+
|
|
|
+ try:
|
|
|
+ with connection.cursor() as cursor:
|
|
|
+ cursor.execute(sql_query)
|
|
|
+ column_names = [col[0] for col in cursor.description]
|
|
|
+
|
|
|
+ with open(export_file, 'w', encoding='utf-8') as f:
|
|
|
+ batch_size = 1000
|
|
|
+ count = 0
|
|
|
+
|
|
|
+ while True:
|
|
|
+ rows = cursor.fetchmany(batch_size)
|
|
|
+ if not rows:
|
|
|
+ break
|
|
|
+
|
|
|
+ for row in rows:
|
|
|
+ row_dict = dict(zip(column_names, row))
|
|
|
+ f.write(generate_insert_sql(table_name, row_dict) + '\n')
|
|
|
+ count += 1
|
|
|
+
|
|
|
+ log.info(f"已导出 {count} 行数据到 {table_name}")
|
|
|
+
|
|
|
+ log.info(f"表 {table_name} 导出完成,共 {count} 行数据")
|
|
|
+ finally:
|
|
|
+ connection.close()
|
|
|
+
|
|
|
+
|
|
|
+def get_latest_month(engine, table_name, month_column):
|
|
|
+ """
|
|
|
+ 获取表中最新月份
|
|
|
+ """
|
|
|
+ query = text(f"SELECT MAX(`{month_column}`) FROM `{table_name}`")
|
|
|
+ with engine.connect() as conn:
|
|
|
+ result = conn.execute(query).scalar()
|
|
|
+ return result
|
|
|
+
|
|
|
+
|
|
|
+def export_tables():
|
|
|
+ """
|
|
|
+ 按规则导出所有表
|
|
|
+ """
|
|
|
+ db_helper = DBHelper()
|
|
|
+ engine = db_helper.engine
|
|
|
+
|
|
|
+ # 按年份月份的表组
|
|
|
+ month_tables = [
|
|
|
+ 't_yujin_crossborder_commodity_country',
|
|
|
+ 't_yujin_crossborder_commodity_trade',
|
|
|
+ 't_yujin_crossborder_country_trade',
|
|
|
+ 't_yujin_crossborder_region_trade'
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 需全量导出的表
|
|
|
+ full_tables = [
|
|
|
+ 't_yujin_crossborder_yearly_summary',
|
|
|
+ 't_yujin_crossborder_monthly_summary'
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 省份交叉表
|
|
|
+ prov_tables = [
|
|
|
+ 't_yujin_crossborder_prov_commodity_trade',
|
|
|
+ 't_yujin_crossborder_prov_country_trade',
|
|
|
+ 't_yujin_crossborder_prov_region_trade'
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 导出按年份月份的表(最新月数据)
|
|
|
+ for table in month_tables:
|
|
|
+ try:
|
|
|
+ latest_month = get_latest_month(engine, table, 'year_month')
|
|
|
+ if latest_month:
|
|
|
+ export_file = EXPORT_DIR / f"{table}_{latest_month}.sql"
|
|
|
+ sql_query = f"SELECT * FROM `{table}` WHERE `year_month` = '{latest_month}'"
|
|
|
+ export_table(table, sql_query, export_file)
|
|
|
+ else:
|
|
|
+ log.warning(f"表 {table} 未找到数据,跳过导出")
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"导出表 {table} 时出错: {str(e)}")
|
|
|
+
|
|
|
+ # 导出全量表
|
|
|
+ for table in full_tables:
|
|
|
+ try:
|
|
|
+ export_file = EXPORT_DIR / f"{table}_{latest_month}.sql"
|
|
|
+ export_table(table, f"SELECT * FROM `{table}`", export_file)
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"导出表 {table} 时出错: {str(e)}")
|
|
|
+
|
|
|
+ # 导出省份表(最新月数据)
|
|
|
+ for table in prov_tables:
|
|
|
+ try:
|
|
|
+ latest_month = get_latest_month(engine, table, 'crossborder_year_month')
|
|
|
+ if latest_month:
|
|
|
+ export_file = EXPORT_DIR / f"{table}_{latest_month}.sql"
|
|
|
+ sql_query = f"SELECT * FROM `{table}` WHERE `crossborder_year_month` = '{latest_month}'"
|
|
|
+ export_table(table, sql_query, export_file)
|
|
|
+ else:
|
|
|
+ log.warning(f"表 {table} 未找到数据,跳过导出")
|
|
|
+ except Exception as e:
|
|
|
+ log.error(f"导出表 {table} 时出错: {str(e)}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ export_tables()
|
|
|
+ log.info(f"所有表导出完成,文件保存在: {EXPORT_DIR}")
|