import re import xlrd OUTPUT_SQL = "../downloads/trade_monthly04.sql" # 输出文件名 def is_2025_data(date_str): """判断是否为2025年数据[1,5](@ref)""" try: return date_str.startswith('2025.') except: return False def convert_unit(value): """亿元转万元,处理空值和异常[1,4](@ref)""" try: return round(float(value) * 10000, 4) if value not in ['-', ''] else None except: return None def parse_ratio(value): """处理百分比数据[5](@ref)""" return f"'{value}'" if value not in ['-', ''] else 'NULL' def generate_sql(data_group): """动态生成SQL""" # 基础数据行 base_row = data_group[0] year_month = base_row[1].replace('.', '-') # 月度数据转换 monthly_total = convert_unit(base_row[2]) monthly_import = convert_unit(base_row[4]) # 第5列是进口 monthly_export = convert_unit(base_row[3]) # 第4列是出口 trade_balance = convert_unit(base_row[5]) ytd_total = convert_unit(base_row[6]) ytd_import = convert_unit(base_row[8]) ytd_export = convert_unit(base_row[7]) ytd_balance = convert_unit(base_row[9]) # 初始化同比数据 yoy_data = {'import_export': 'NULL', 'import': 'NULL', 'export': 'NULL'} # 处理2025年三行数据 if len(data_group) == 3: # 同比数据在第2行的3-5列 yoy_row = data_group[1] yoy_data = { 'import_export': parse_ratio(yoy_row[2]), # 第4列 'import': parse_ratio(yoy_row[4]), # 第5列 'export': parse_ratio(yoy_row[3]) # 第6列 } return f""" INSERT INTO `t_yujin_crossborder_monthly_summary` (`year_month`, `monthly_total`, `monthly_import`, `monthly_export`, `trade_balance`, `ytd_total`, `ytd_import`, `ytd_export`, `ytd_trade_balance`, `yoy_import_export`, `yoy_import`, `yoy_export`, `create_time`) VALUES ( '{year_month}', {monthly_total or 'NULL'}, {monthly_import or 'NULL'}, {monthly_export or 'NULL'}, {trade_balance or 'NULL'}, {ytd_total or 'NULL'}, {ytd_import or 'NULL'}, {ytd_export or 'NULL'}, {ytd_balance or 'NULL'}, {yoy_data['import_export']}, {yoy_data['import']}, {yoy_data['export']}, NOW() ); """ def main(): workbook = xlrd.open_workbook('D:/Downloads/2025051809220574256.xls') sheet = workbook.sheet_by_index(0) with open(OUTPUT_SQL, 'w', encoding='utf-8') as f: pass row_idx = 5 # 数据起始行 while row_idx < sheet.nrows: try: # 获取基础行 base_row = sheet.row_values(row_idx) date_cell = str(base_row[1]) if not re.match(r"202[0-9]\.\d{2}", date_cell): # 跳过无效行 row_idx += 1 continue # 动态读取数据组 if is_2025_data(date_cell): data_group = [ base_row, sheet.row_values(row_idx + 1), sheet.row_values(row_idx + 2) ] step = 3 else: data_group = [base_row] step = 1 sql = generate_sql(data_group) with open(OUTPUT_SQL, "a", encoding="utf-8") as f: f.write(sql + "\n") print(f"成功生成 {len(sql)} 条SQL语句,保存至:{OUTPUT_SQL}") print(sql) row_idx += step except IndexError: print(f"行{row_idx}数据不完整,已跳过") row_idx += 1 except Exception as e: print(f"处理行{row_idx}出错:{str(e)}") row_idx += 1 if __name__ == "__main__": main()