|
@@ -0,0 +1,122 @@
|
|
|
+import time
|
|
|
+
|
|
|
+import pymysql
|
|
|
+from pymysql.cursors import DictCursor
|
|
|
+
|
|
|
+from crossborder.utils.base_mysql import get_decrypted_password, DB_CONFIG
|
|
|
+from crossborder.utils.log import get_logger
|
|
|
+
|
|
|
+log = get_logger(__name__)
|
|
|
+
|
|
|
+# 获取数据库配置并解密密码
|
|
|
+db_config = DB_CONFIG.copy()
|
|
|
+db_config['password'] = get_decrypted_password()
|
|
|
+
|
|
|
+# 定义查询语句和对应的目标表名
|
|
|
+queries = [
|
|
|
+ {
|
|
|
+ "query": "SELECT * FROM t_yujin_crossborder_prov_commodity_category",
|
|
|
+ "table_name": "t_yujin_crossborder_prov_commodity_category"
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "query": "SELECT * FROM t_yujin_crossborder_prov_commodity_trade",
|
|
|
+ "table_name": "t_yujin_crossborder_prov_commodity_trade"
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "query": "SELECT * FROM t_yujin_crossborder_prov_country_trade",
|
|
|
+ "table_name": "t_yujin_crossborder_prov_country_trade"
|
|
|
+ },
|
|
|
+ {
|
|
|
+ "query": "SELECT * FROM t_yujin_crossborder_prov_region_trade",
|
|
|
+ "table_name": "t_yujin_crossborder_prov_region_trade"
|
|
|
+ }
|
|
|
+]
|
|
|
+
|
|
|
+PAGE_SIZE = 1000 # 每页读取的记录数
|
|
|
+MAX_RETRIES = 3 # 最大重试次数
|
|
|
+RETRY_DELAY = 5 # 重试间隔时间(秒)
|
|
|
+
|
|
|
+def escape_value(value):
|
|
|
+ """处理字段值,字符串类型添加引号并转义"""
|
|
|
+ if isinstance(value, str):
|
|
|
+ # 转义字符串内的单引号,并用单引号包裹整个字符串
|
|
|
+ return "'" + value.replace("'", "''") + "'"
|
|
|
+ else:
|
|
|
+ return str(value)
|
|
|
+
|
|
|
+def execute_query_and_save_to_sql(query_info):
|
|
|
+ table_name = query_info["table_name"]
|
|
|
+ base_query = query_info["query"]
|
|
|
+ file_name = f"{table_name}.sql"
|
|
|
+
|
|
|
+ log.info(f"开始处理 {table_name}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ connection = pymysql.connect(
|
|
|
+ host=db_config['host'],
|
|
|
+ user=db_config['user'],
|
|
|
+ password=db_config['password'],
|
|
|
+ database=db_config['database'],
|
|
|
+ port=db_config['port'],
|
|
|
+ charset=db_config['charset'],
|
|
|
+ cursorclass=DictCursor,
|
|
|
+ connect_timeout=30
|
|
|
+ )
|
|
|
+ except pymysql.err.OperationalError as e:
|
|
|
+ log.error(f"数据库连接失败: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ page = 0
|
|
|
+ total_records = 0
|
|
|
+
|
|
|
+ with connection.cursor() as cursor:
|
|
|
+ while True:
|
|
|
+ offset = page * PAGE_SIZE
|
|
|
+ sql = f"{base_query} LIMIT {offset}, {PAGE_SIZE}"
|
|
|
+
|
|
|
+ for attempt in range(MAX_RETRIES):
|
|
|
+ try:
|
|
|
+ log.debug(f"正在执行分页查询: {sql}")
|
|
|
+ cursor.execute(sql)
|
|
|
+ results = cursor.fetchall()
|
|
|
+
|
|
|
+ if not results:
|
|
|
+ log.info(f"{table_name} 数据导出完成,共导出 {total_records} 条记录")
|
|
|
+ return
|
|
|
+
|
|
|
+ # 构建插入语句并写入文件
|
|
|
+ columns = [col for col in results[0].keys() if col != 'id']
|
|
|
+ insert_template = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES\n"
|
|
|
+
|
|
|
+ values = []
|
|
|
+ for row in results:
|
|
|
+ value_line = "(" + ", ".join(
|
|
|
+ escape_value(value) for key, value in row.items() if key != 'id'
|
|
|
+ ) + ")"
|
|
|
+ values.append(value_line)
|
|
|
+
|
|
|
+ full_insert = insert_template + ",\n".join(values) + ";\n"
|
|
|
+
|
|
|
+ # 追加写入文件
|
|
|
+ with open(file_name, 'a', encoding='utf-8') as f:
|
|
|
+ f.write(full_insert)
|
|
|
+
|
|
|
+ total_records += len(results)
|
|
|
+ log.info(f"{table_name} 已写入 {len(results)} 条记录,累计 {total_records} 条")
|
|
|
+
|
|
|
+ page += 1
|
|
|
+ break # 成功执行就跳出重试循环
|
|
|
+
|
|
|
+ except (pymysql.err.OperationalError, pymysql.err.InternalError) as e:
|
|
|
+ if attempt < MAX_RETRIES - 1:
|
|
|
+ log.warning(f"数据库查询失败,{RETRY_DELAY}秒后重试... 错误: {e}")
|
|
|
+ time.sleep(RETRY_DELAY)
|
|
|
+ else:
|
|
|
+ log.error(f"{table_name} 查询失败,已达最大重试次数: {e}")
|
|
|
+ raise
|
|
|
+
|
|
|
+ connection.close()
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ for query_info in queries:
|
|
|
+ execute_query_and_save_to_sql(query_info)
|