123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178 |
- import os
- import re
- from datetime import datetime
- from pathlib import Path
- from crossborder.utils import base_mysql
- from crossborder.quanguo.parse_commodity_country_detail_excel import parse_commodity_country_detail
- from crossborder.quanguo.parse_commodity_table_excel import parse_commodity_table_excel
- from crossborder.quanguo.parse_country_table_excel import parse_country_table_excel
- from crossborder.quanguo.parse_month_excel import parse_month_table_excel
- from crossborder.quanguo.parse_region_table_excel import parse_region_table_excel
- from crossborder.quanguo.parse_year_excel import parse_year_table_excel
- from crossborder.utils.base_mysql import provinces
- from crossborder.utils.constants import DOWNLOAD_DIR
- from crossborder.utils.log import get_logger
- log = get_logger(__name__)
- def perform_data_cleanup_and_import(current_year):
- """
- 数据清洗与入库主函数
- :param current_year: 当前年份,用于定位数据目录
- """
- # 构建当前年度数据目录路径
- year_data_dir = DOWNLOAD_DIR / "total" / str(current_year)
- log.info(f"\n数据清洗入库中...")
- try:
- # 获取所有月份子目录(如 01月、02月)
- month_dirs = [
- d for d in os.listdir(year_data_dir)
- if re.match(r'^\d{2}$', d)
- ]
- if not month_dirs:
- log.warning(f"{year_data_dir} 下未找到任何月份目录,跳过数据清洗")
- return
- # 按文件夹名排序,取最新月份
- latest_month = sorted(month_dirs, reverse=True)[0]
- latest_month_path = Path(year_data_dir) / latest_month
- log.info(f"发现最新月份目录: {latest_month_path}")
- # 遍历该目录下的所有 Excel 文件并处理
- for file in os.listdir(latest_month_path):
- full_path = latest_month_path / file
- if not file.endswith(('.xls', '.xlsx')):
- continue
- if '(1)' in file and '年度表' in file:
- log.info(f"处理年度汇总表: {file}")
- parse_year_table_excel(full_path)
- elif '(1)' in file and '月度表' in file:
- log.info(f"处理月度汇总表: {file}")
- parse_month_table_excel(full_path)
- elif '(2)' in file:
- log.info(f"处理国别(地区)贸易表: {file}")
- parse_country_table_excel(full_path)
- elif '(4)' in file:
- log.info(f"处理类章贸易表: {file}")
- parse_commodity_table_excel(full_path)
- elif '(8)' in file:
- log.info(f"处理收发货人所在地表: {file}")
- parse_region_table_excel(full_path)
- elif '(15)' in file:
- log.info(f"处理对部分国家(地区)出口类章金额表: {full_path}")
- parse_commodity_country_detail(full_path, "export")
- elif '(16)' in file:
- log.info(f"处理自部分国家(地区)进口类章金额表: {full_path}")
- parse_commodity_country_detail(full_path, "import")
- else:
- log.warning(f"未知类型文件,跳过: {full_path}")
- log.info("数据清洗与入库完成!")
- except Exception as e:
- log.error(f"数据清洗失败: {str(e)}")
- raise
- finally:
- log.info("更新省市同比数据!")
- base_mysql.update_shandong_yoy("河南省")
- base_mysql.update_shandong_yoy_origin("山东省")
- def main():
- """
- 按年份倒序处理(如:2025 -> 2024 -> 2023),每个月份也按倒序处理,
- 解析所有'收发货人所在地表'文件。
- """
- # 当前年份开始,倒序到2023
- for year in range(datetime.now().year, 2022, -1): # 2025 -> 2024 -> 2023
- year_data_dir = DOWNLOAD_DIR / "total" / str(year)
- if not year_data_dir.exists():
- log.warning(f"{year_data_dir} 目录不存在,跳过该年份")
- continue
- log.info(f"\n开始处理 {year} 年的收发货人所在地表...")
- try:
- # 获取所有月份子目录(如 01月、02月)
- month_dirs = [
- d for d in os.listdir(year_data_dir)
- if re.match(r'^\d{2}$', d)
- ]
- if not month_dirs:
- log.warning(f"{year_data_dir} 下未找到任何月份目录,跳过该年份")
- continue
- # 按月份倒序排序(12月优先)
- sorted_months = sorted(month_dirs, reverse=True)
- for month in sorted_months:
- month_path = Path(year_data_dir) / month
- log.info(f"正在处理月份目录: {month_path}")
- # 遍历该月份目录下的所有 Excel 文件
- for file in os.listdir(month_path):
- full_path = month_path / file
- if not file.endswith(('.xls', '.xlsx')):
- continue
- # if '(1)' in file and '年度表' in file:
- # log.info(f"处理年度汇总表: {file}")
- # parse_year_table_excel(full_path)
- #
- # elif '(1)' in file and '月度表' in file:
- # log.info(f"处理月度汇总表: {file}")
- # parse_month_table_excel(full_path)
- #
- # elif '(2)' in file:
- # log.info(f"处理国别(地区)贸易表: {file}")
- # parse_country_table_excel(full_path)
- #
- # elif '(4)' in file:
- # log.info(f"处理类章贸易表: {file}")
- # parse_commodity_table_excel(full_path)
- if '(8)' in file:
- log.info(f"处理收发货人所在地表: {file}")
- parse_region_table_excel(full_path)
- # elif '(15)' in file:
- # log.info(f"处理对部分国家(地区)出口类章金额表: {full_path}")
- # parse_commodity_country_detail(full_path, "export")
- #
- # elif '(16)' in file:
- # log.info(f"处理自部分国家(地区)进口类章金额表: {full_path}")
- # parse_commodity_country_detail(full_path, "import")
- #
- # else:
- # log.warning(f"未知类型文件,跳过: {full_path}")
- log.info(f"{year} 年的数据处理完成!")
- except Exception as e:
- log.error(f"{year} 年数据处理失败: {str(e)}")
- log.info("更新海关总署省份同比数据!")
- for province in provinces:
- base_mysql.update_shandong_yoy_origin(province)
- log.info("数据更新完成!")
- if __name__ == "__main__":
- main()
- # perform_data_cleanup_and_import(2025)
|