data_cleaning_to_db.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import os
  2. import re
  3. from datetime import datetime
  4. from pathlib import Path
  5. from crossborder.utils import base_mysql
  6. from crossborder.quanguo.parse_commodity_country_detail_excel import parse_commodity_country_detail
  7. from crossborder.quanguo.parse_commodity_table_excel import parse_commodity_table_excel
  8. from crossborder.quanguo.parse_country_table_excel import parse_country_table_excel
  9. from crossborder.quanguo.parse_month_excel import parse_month_table_excel
  10. from crossborder.quanguo.parse_region_table_excel import parse_region_table_excel
  11. from crossborder.quanguo.parse_year_excel import parse_year_table_excel
  12. from crossborder.utils.base_mysql import provinces
  13. from crossborder.utils.constants import DOWNLOAD_DIR
  14. from crossborder.utils.log import get_logger
  15. log = get_logger(__name__)
  16. def perform_data_cleanup_and_import(current_year):
  17. """
  18. 数据清洗与入库主函数
  19. :param current_year: 当前年份,用于定位数据目录
  20. """
  21. # 构建当前年度数据目录路径
  22. year_data_dir = DOWNLOAD_DIR / "total" / str(current_year)
  23. log.info(f"\n数据清洗入库中...")
  24. try:
  25. # 获取所有月份子目录(如 01月、02月)
  26. month_dirs = [
  27. d for d in os.listdir(year_data_dir)
  28. if re.match(r'^\d{2}$', d)
  29. ]
  30. if not month_dirs:
  31. log.warning(f"{year_data_dir} 下未找到任何月份目录,跳过数据清洗")
  32. return
  33. # 按文件夹名排序,取最新月份
  34. latest_month = sorted(month_dirs, reverse=True)[0]
  35. latest_month_path = Path(year_data_dir) / latest_month
  36. log.info(f"发现最新月份目录: {latest_month_path}")
  37. # 遍历该目录下的所有 Excel 文件并处理
  38. for file in os.listdir(latest_month_path):
  39. full_path = latest_month_path / file
  40. if not file.endswith(('.xls', '.xlsx')):
  41. continue
  42. if '(1)' in file and '年度表' in file:
  43. log.info(f"处理年度汇总表: {file}")
  44. parse_year_table_excel(full_path)
  45. elif '(1)' in file and '月度表' in file:
  46. log.info(f"处理月度汇总表: {file}")
  47. parse_month_table_excel(full_path)
  48. elif '(2)' in file:
  49. log.info(f"处理国别(地区)贸易表: {file}")
  50. parse_country_table_excel(full_path)
  51. elif '(4)' in file:
  52. log.info(f"处理类章贸易表: {file}")
  53. parse_commodity_table_excel(full_path)
  54. elif '(8)' in file:
  55. log.info(f"处理收发货人所在地表: {file}")
  56. parse_region_table_excel(full_path)
  57. elif '(15)' in file:
  58. log.info(f"处理对部分国家(地区)出口类章金额表: {full_path}")
  59. parse_commodity_country_detail(full_path, "export")
  60. elif '(16)' in file:
  61. log.info(f"处理自部分国家(地区)进口类章金额表: {full_path}")
  62. parse_commodity_country_detail(full_path, "import")
  63. else:
  64. log.warning(f"未知类型文件,跳过: {full_path}")
  65. log.info("数据清洗与入库完成!")
  66. except Exception as e:
  67. log.error(f"数据清洗失败: {str(e)}")
  68. raise
  69. finally:
  70. log.info("更新省市同比数据!")
  71. base_mysql.update_shandong_yoy("河南省")
  72. base_mysql.update_shandong_yoy_origin("山东省")
  73. def main():
  74. """
  75. 按年份倒序处理(如:2025 -> 2024 -> 2023),每个月份也按倒序处理,
  76. 解析所有'收发货人所在地表'文件。
  77. """
  78. # 当前年份开始,倒序到2023
  79. for year in range(datetime.now().year, 2022, -1): # 2025 -> 2024 -> 2023
  80. year_data_dir = DOWNLOAD_DIR / "total" / str(year)
  81. if not year_data_dir.exists():
  82. log.warning(f"{year_data_dir} 目录不存在,跳过该年份")
  83. continue
  84. log.info(f"\n开始处理 {year} 年的收发货人所在地表...")
  85. try:
  86. # 获取所有月份子目录(如 01月、02月)
  87. month_dirs = [
  88. d for d in os.listdir(year_data_dir)
  89. if re.match(r'^\d{2}$', d)
  90. ]
  91. if not month_dirs:
  92. log.warning(f"{year_data_dir} 下未找到任何月份目录,跳过该年份")
  93. continue
  94. # 按月份倒序排序(12月优先)
  95. sorted_months = sorted(month_dirs, reverse=True)
  96. for month in sorted_months:
  97. month_path = Path(year_data_dir) / month
  98. log.info(f"正在处理月份目录: {month_path}")
  99. # 遍历该月份目录下的所有 Excel 文件
  100. for file in os.listdir(month_path):
  101. full_path = month_path / file
  102. if not file.endswith(('.xls', '.xlsx')):
  103. continue
  104. # if '(1)' in file and '年度表' in file:
  105. # log.info(f"处理年度汇总表: {file}")
  106. # parse_year_table_excel(full_path)
  107. #
  108. # elif '(1)' in file and '月度表' in file:
  109. # log.info(f"处理月度汇总表: {file}")
  110. # parse_month_table_excel(full_path)
  111. #
  112. # elif '(2)' in file:
  113. # log.info(f"处理国别(地区)贸易表: {file}")
  114. # parse_country_table_excel(full_path)
  115. #
  116. # elif '(4)' in file:
  117. # log.info(f"处理类章贸易表: {file}")
  118. # parse_commodity_table_excel(full_path)
  119. if '(8)' in file:
  120. log.info(f"处理收发货人所在地表: {file}")
  121. parse_region_table_excel(full_path)
  122. # elif '(15)' in file:
  123. # log.info(f"处理对部分国家(地区)出口类章金额表: {full_path}")
  124. # parse_commodity_country_detail(full_path, "export")
  125. #
  126. # elif '(16)' in file:
  127. # log.info(f"处理自部分国家(地区)进口类章金额表: {full_path}")
  128. # parse_commodity_country_detail(full_path, "import")
  129. #
  130. # else:
  131. # log.warning(f"未知类型文件,跳过: {full_path}")
  132. log.info(f"{year} 年的数据处理完成!")
  133. except Exception as e:
  134. log.error(f"{year} 年数据处理失败: {str(e)}")
  135. log.info("更新海关总署省份同比数据!")
  136. for province in provinces:
  137. base_mysql.update_shandong_yoy_origin(province)
  138. log.info("数据更新完成!")
  139. if __name__ == "__main__":
  140. main()
  141. # perform_data_cleanup_and_import(2025)