Ver código fonte

Merge branch 'master' of http://42.192.203.166:3000/wyp/crossborder

01495251 3 dias atrás
pai
commit
907e43b2be

+ 65 - 45
anhui/crawl_gov_anhui_full.py

@@ -1,8 +1,9 @@
 import os
 import random
 import re
-import subprocess
+import sys
 import time
+from datetime import datetime, timedelta
 from pathlib import Path
 
 from faker import Faker
@@ -16,6 +17,7 @@ import gov_commodity_anhui_city
 import gov_commodity_anhui_country
 import gov_commodity_anhui_import_export
 from utils import base_country_code, base_mysql
+from utils.log import log
 
 download_dir = base_country_code.download_dir
 Path(download_dir).mkdir(parents=True, exist_ok=True)
@@ -49,7 +51,7 @@ def configure_stealth_options():
     opts.add_argument("--headless")
     return opts
 
-def find_target_links(driver):
+def find_target_links(driver, year_month):
     """点击列表页链接进入详情页下载文件"""
     WebDriverWait(driver, 30).until(
         EC.presence_of_element_located((By.ID, "conRight"))
@@ -70,11 +72,11 @@ def find_target_links(driver):
             # 新标签页打开链接
             driver.execute_script("window.open(arguments[0]);", link_url)
             driver.switch_to.window(driver.window_handles[-1])
-            print(f"正在处理详情页: {link_url}")
+            log.info(f"正在处理详情页: {link_url}")
 
             try:
                 # 在详情页下载文件
-                download_result = download_file_from_detail_page(driver)
+                download_result = download_file_from_detail_page(driver, year_month)
                 if download_result == 'stop':
                     return 'stop'
                 processed_urls.add(link_url)
@@ -87,9 +89,9 @@ def find_target_links(driver):
 
         return None
     except Exception as e:
-        print(f"下载时发生异常: {str(e)}")
+        log.info(f"下载时发生异常: {str(e)}")
 
-def download_file_from_detail_page(driver):
+def download_file_from_detail_page(driver, year_month):
     WebDriverWait(driver, 30).until(
         EC.presence_of_element_located((By.ID, "easysiteText"))
     )
@@ -97,17 +99,22 @@ def download_file_from_detail_page(driver):
     try:
         elements = driver.find_elements(By.XPATH, '//div[@id="easysiteText"]//a')
         if not elements:
-            print("详情页未找到目标文件链接")
+            log.info("详情页未找到目标文件链接")
             return None
 
         for download_btn in elements:
             file_name = download_btn.text.strip()
             if not file_name:
                 continue
-            if file_name.startswith('2022'):
-                return 'stop'
+            if year_month is None:
+                if file_name.startswith('2022'):
+                    return 'stop'
+            else:
+                if not file_name.startswith(year_month):
+                    log.info(f"非 {year_month} 文件: {file_name}, stop")
+                    return 'stop'
             if '美元' in file_name or '商品贸易方式' in file_name or '进出口总值' in file_name or '月度表' in file_name:
-                print(f'{file_name} 不需要此文件,跳过')
+                log.info(f'{file_name} 不需要此文件,跳过')
                 continue
 
             file_url = download_btn.get_attribute("href")
@@ -116,10 +123,10 @@ def download_file_from_detail_page(driver):
                 file_url = base_url + file_url
 
             if not file_url.lower().endswith(('.xls', '.xlsx')):
-                print(f"跳过非 Excel 文件: {file_url}")
+                log.info(f"跳过非 Excel 文件: {file_url}")
                 continue
 
-            print(f"正在下载: {file_name} → {file_url}")
+            log.info(f"正在下载: {file_name} → {file_url}")
 
             # 记录下载前的文件列表
             existing_files = set(f.name for f in Path(download_dir).glob('*'))
@@ -132,18 +139,18 @@ def download_file_from_detail_page(driver):
             year, start_month, month = extract_year_and_month(file_name)
             final_path = Path(download_dir) / year / month / f"{file_name}"
             if os.path.exists(final_path):
-                print(f"文件已存在:{file_name} 正在覆盖...")
+                log.info(f"文件已存在:{file_name} 正在覆盖...")
                 os.unlink(final_path)
 
             final_dir = Path(download_dir) / year / month
             final_dir.mkdir(parents=True, exist_ok=True)
-            print(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
+            log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
             downloaded_file.rename(final_path)
-            print(f"√ 下载成功:{final_path} \n")
+            log.info(f"√ 下载成功:{final_path} \n")
 
         return None
     except Exception as e:
-        print(f"详情页处理异常: {str(e)}")
+        log.info(f"详情页处理异常: {str(e)}")
         return None
 
 def extract_year_and_month(file_name):
@@ -161,32 +168,44 @@ def extract_year_and_month(file_name):
     else:
         raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
 
-def extract_rar(rar_path, extract_to):
-    """备用解压函数(当 rarfile 失效时使用)"""
-    winrar_path = r"C:\Program Files\WinRAR\Rar.exe"  # 推荐使用 Rar.exe 而非 WinRAR.exe
-    cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
-
-    # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
-    creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
-
-    result = subprocess.run(
-        cmd,
-        stdout=subprocess.PIPE,
-        stderr=subprocess.PIPE,
-        creationflags=creationflags  # 关键点:隐藏窗口
-    )
-
-    if result.returncode == 0:
-        print(f"解压成功: {rar_path} → {extract_to}")
-        return True
-    else:
-        print(f"解压失败: {result.stderr.decode('gbk')}")
-        return False
-
-
-def crawl_with_selenium(url):
+def detect_latest_month(driver, url):
+    driver.get(url)
+    current_date = datetime.now()
+    for offset in range(0, 3):
+        check_date = current_date - timedelta(days=offset * 30)
+        check_year = check_date.year
+        check_month = check_date.month
+
+        target_title = f"{check_year}年{check_month}月"
+        try:
+            WebDriverWait(driver, 10).until(
+                EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
+            )
+            log.info(f"已找到最新月份数据 {check_year}-{check_month}")
+            # 看是否已存表,已存则跳过;
+            count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', "340000")
+            if count > 0:
+                log.info(f"已存在 {check_year}-{check_month} 数据,跳过")
+                continue
+            return f"{check_year}年{check_month}月"
+        except:
+            log.info(f"未找到 {target_title}")
+            continue
+    log.info("三个月内未找到有效数据")
+    return None
+
+def crawl_with_selenium(url, mark):
     driver = webdriver.Firefox(options=configure_stealth_options())
 
+    year_month = None
+    if 'increment' == mark:
+        res = detect_latest_month(driver, url)
+        if res is None:
+            log.info("安徽省海关没有最新数据更新")
+            sys.exit(0)
+        year_month = res
+        print(f"检测到最新有效数据:{year_month}")
+
     try:
         # 注入反检测脚本
         driver.execute_script("""
@@ -201,7 +220,7 @@ def crawl_with_selenium(url):
 
         while True:
             # 访问当前页
-            result = find_target_links(driver)
+            result = find_target_links(driver, year_month)
             if result == 'stop':
                 break
 
@@ -217,7 +236,7 @@ def crawl_with_selenium(url):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                print("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止爬取")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -228,7 +247,7 @@ def crawl_with_selenium(url):
             # 访问下一页
             driver.get(next_page_url)
 
-            print(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始爬取 {next_page_url} 页面数据")
 
     finally:
         driver.quit()
@@ -274,7 +293,7 @@ def hierarchical_traversal(root_path):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        print(f"\n年份:{year_dir.name} | 省份:anhui")
 
         # 提取月份目录
         month_dirs = []
@@ -293,7 +312,8 @@ def hierarchical_traversal(root_path):
                 gov_commodity_anhui_city.process_folder(md['path'])
 
 if __name__ == "__main__":
-    crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html')
+    crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html', 'all')
+    # crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html', 'increment')
     print(f"安徽合肥海关全量数据下载任务完成")
     # 等待5s后执行
     time.sleep(5)

+ 32 - 29
anhui/gov_commodity_anhui_city.py

@@ -1,39 +1,40 @@
+import time
 from pathlib import Path
 
-import pandas
 import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 city_code_map = {
-    "安徽省合肥市": "3401",
-    "安徽省芜湖市": "3402",
-    "安徽省蚌埠市": "3403",
-    "安徽省淮南市": "3404",
-    "安徽省马鞍山市": "3405",
-    "安徽省淮北市": "3406",
-    "安徽省铜陵市": "3407",
-    "安徽省安庆市": "3408",
-    "安徽省黄山市": "3410",
-    "安徽省滁州市": "3411",
-    "安徽省阜阳市": "3412",
-    "安徽省宿州市": "3413",
-    "安徽省六安市": "3415",
-    "安徽省亳州市": "3416",
-    "安徽省池州市": "3417",
-    "安徽省宣城市": "3418"
+    "安徽省合肥市": "340100",
+    "安徽省芜湖市": "340200",
+    "安徽省蚌埠市": "340300",
+    "安徽省淮南市": "340400",
+    "安徽省马鞍山市": "340500",
+    "安徽省淮北市": "340600",
+    "安徽省铜陵市": "340700",
+    "安徽省安庆市": "340800",
+    "安徽省黄山市": "341000",
+    "安徽省滁州市": "341100",
+    "安徽省阜阳市": "341200",
+    "安徽省宿州市": "341300",
+    "安徽省六安市": "341500",
+    "安徽省亳州市": "341600",
+    "安徽省池州市": "341700",
+    "安徽省宣城市": "341800"
 }
 
 def get_df(path):
     global df,  df_type
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     for file in file_paths:
         if "收发货人" in file.name:
-            print(f"处理多文件: {file.name}")
+            log.info(f"处理多文件: {file.name}")
             file_path = Path(path) / file
             df = pd.read_excel(file_path, header=None).iloc[5:]
             break
@@ -47,7 +48,7 @@ def process_folder(path):
     sql_arr = []
     res = get_df(path)
     if res is None:
-        print(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
+        log.info(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
         return
     df = res
     country_name_index = 0
@@ -57,7 +58,7 @@ def process_folder(path):
         city_name = str(row.values[country_name_index]).strip()
         city_code = city_code_map.get(city_name)
         if not city_code:
-            print(f"未找到省 '{city_name}' 对应市编码")
+            log.info(f"未找到省 '{city_name}' 对应市编码")
             continue
 
         monthly_export, monthly_import, monthly_total, yoy_export, yoy_import, yoy_import_export = value_row(row, month)
@@ -71,21 +72,23 @@ def process_folder(path):
             yoy_import_export, yoy_import, yoy_export = 0, 0, 0
             sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-                   f"('{year}', '{year_month_2}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+                   f"('{year}', '{year_month_2}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now())"
+                   f"ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr_copy.append(sql)
 
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now())"
+               f"ON DUPLICATE KEY UPDATE create_time = now() ;")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     base_mysql.bulk_insert(sql_arr)
     if month == 2:
-        print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_region_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_region_trade SQL 存表完成!")
 
 
 def value_row(row, month):
@@ -110,7 +113,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:anhui")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -119,9 +122,9 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"安徽合肥海关城市所有文件处理完成!")
+    log.info(f"安徽合肥海关城市所有文件处理完成!")

+ 14 - 12
anhui/gov_commodity_anhui_country.py

@@ -4,6 +4,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 # 排除地区名单
 EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "大洋洲", "南极洲",
@@ -13,11 +14,11 @@ EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "
 def get_df(path):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     for file in file_paths:
         if "国别" in file.name:
-            print(f"处理多文件: {file.name}")
+            log.info(f"处理多文件: {file.name}")
             file_path = Path(path) / file
             return pd.read_excel(file_path, header=None).iloc[6:]
     return None
@@ -31,7 +32,7 @@ def process_folder(path):
     # try:
     df = get_df(path)
     if df is None:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
 
     country_name_index = 0
@@ -53,7 +54,7 @@ def process_folder(path):
         # 获取国家编码
         country_code = base_country_code.COUNTRY_CODE_MAPPING.get(country_name)
         if not country_code:
-            print(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
+            log.info(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
             continue
 
         # 提取数据并格式化
@@ -69,7 +70,8 @@ def process_folder(path):
             yoy_import_export, yoy_import, yoy_export = 0, 0, 0
             sql = (f"INSERT INTO t_yujin_crossborder_prov_country_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, country_code, country_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-                   f"('{year}', '{year_month_2}', '340000', '安徽省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+                   f"('{year}', '{year_month_2}', '340000', '安徽省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now())"
+                   f"ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr_copy.append(sql)
 
         # 构建 SQL
@@ -79,17 +81,17 @@ def process_folder(path):
             f"monthly_total, monthly_export, monthly_import, yoy_import_export, yoy_import, yoy_export, create_time) "
             f"VALUES ('{year}', '{year_month}', '340000', '安徽省', '{country_code}', '{country_name}', "
             f"{format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', "
-            f"'{yoy_export}', NOW());"
+            f"'{yoy_export}', NOW()) ON DUPLICATE KEY UPDATE create_time = now();"
         )
         sql_arr.append(sql)
 
-    print(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
+    log.info(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
     # 批量插入数据库
     base_mysql.bulk_insert(sql_arr)
     if month == 2:
-        print(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_country_trade SQL 存表完成!\n")
+    log.info(f"√ {year_month} prov_country_trade SQL 存表完成!\n")
 
 
 
@@ -119,7 +121,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:anhui")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -128,10 +130,10 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print("安徽合肥海关国别所有文件处理完成!")
+    log.info("安徽合肥海关国别所有文件处理完成!")

+ 16 - 13
anhui/gov_commodity_anhui_import_export.py

@@ -6,6 +6,7 @@ import pandas as pd
 from utils import base_country_code, base_mysql
 
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 CUSTOM_COMMODITY_REPLACEMENTS = {
     '家具': '家具及其零件',
@@ -43,7 +44,7 @@ def clean_commodity_name(name, preserve_keywords=None):
 def process_folder(path):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     year, month = base_country_code.extract_year_month_from_path(path)
 
@@ -97,11 +98,11 @@ def save_to_database(import_df, export_df, year, month):
         for _, row in merged_df.iterrows():
             commodity_name = str(row['commodity'])
             if commodity_name == '肉类' or commodity_name == '其他' or commodity_name == '干鲜瓜果' or commodity_name == '钟表':
-                print(f'{commodity_name} 商品不存在,跳过')
+                log.info(f'{commodity_name} 商品不存在,跳过')
                 continue
             commodity_code, commodity_name_fix = base_mysql.get_commodity_id(commodity_name)
             if not commodity_code:
-                print(f"未找到商品名称 '{commodity_name}' 对应的 ID")
+                log.info(f"未找到商品名称 '{commodity_name}' 对应的 ID")
                 continue
             if not commodity_name_fix or commodity_name_fix in processed_commodities:
                 continue
@@ -121,27 +122,29 @@ def save_to_database(import_df, export_df, year, month):
                 monthly_total = round(monthly_import + monthly_export, 4)
                 sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                        f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                       f"('{year}', '{year_month_2}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                       f"('{year}', '{year_month_2}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
+                       f"ON DUPLICATE KEY UPDATE create_time = now() ;")
                 sql_arr_copy.append(sql)
 
             sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                   f"('{year}', '{year_month}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                   f"('{year}', '{year_month}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
+                   f"ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)
-            # print(f'{commodity_name} -> {commodity_name_fix}')
+            # log.info(f'{commodity_name} -> {commodity_name_fix}')
 
     except Exception as e:
-        print(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
+        log.info(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
 
-    print(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
     if month == 2:
-        print(f"√ {year_month} prov_commodity_trade copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_commodity_trade copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_commodity_trade SQL 存表完成!\n")
+    log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!\n")
 
 def hierarchical_traversal(root_path):
     """分层遍历:省份->年份->月目录"""
@@ -155,7 +158,7 @@ def hierarchical_traversal(root_path):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:anhui")
 
         # 提取月份目录
         month_dirs = []
@@ -168,7 +171,7 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
@@ -176,4 +179,4 @@ if __name__ == '__main__':
 
     # root = Path(base_country_code.download_dir)/'2025'/'04'
     # process_folder(root)
-    print("安徽合肥海关类章所有文件处理完成!")
+    log.info("安徽合肥海关类章所有文件处理完成!")

+ 15 - 14
hebei/crawl_gov_hebei_full.py

@@ -15,6 +15,7 @@ from utils import base_country_code, base_mysql
 import gov_commodity_hebei_import_export
 import gov_commodity_hebei_country
 import gov_commodity_hebei_city
+from utils.log import log
 
 download_dir = base_country_code.download_dir
 Path(download_dir).mkdir(parents=True, exist_ok=True)
@@ -82,7 +83,7 @@ def find_target_links(driver):
 
     element_arr = driver.find_elements(By.XPATH, '//div[@class="list_con"]//ul[@class="easysite-list-modelone"]//a')
     if not element_arr:
-        print("未找到目标标题")
+        log.info("未找到目标标题")
         return None
     for elements in element_arr:
         file_name = elements.text.strip()
@@ -95,9 +96,9 @@ def find_target_links(driver):
             file_url = remove_prefix_from_url(file_url)
 
             if not file_url.lower().endswith(('.xls', '.xlsx')):
-                print(f"跳过非 Excel 文件: {file_url}")
+                log.info(f"跳过非 Excel 文件: {file_url}")
                 continue
-            print(f"正在下载: {file_name} → {file_url}")
+            log.info(f"正在下载: {file_name} → {file_url}")
             # 记录下载前的文件列表
             existing_files = set(f.name for f in Path(download_dir).glob('*'))
             # 随机点击延迟
@@ -107,21 +108,21 @@ def find_target_links(driver):
             try:
                 downloaded_file = wait_for_download_complete(existing_files=existing_files)
             except Exception as e:
-                print(f"下载失败: {str(e)}")
+                log.info(f"下载失败: {str(e)}")
                 continue
             year, start_month, month = extract_year_and_month(file_name)
             final_path = Path(download_dir) / year / month / f"{file_name}.xls"
             if os.path.exists(final_path):
-                print(f"文件已存在:{file_name} 正在覆盖...")
+                log.info(f"文件已存在:{file_name} 正在覆盖...")
                 os.unlink(final_path)
 
             final_dir = Path(download_dir) / year / month
             final_dir.mkdir(parents=True, exist_ok=True)
-            print(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
+            log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
             downloaded_file.rename(final_path)
-            print(f"√ 下载成功:{final_path}")
+            log.info(f"√ 下载成功:{final_path}")
         else:
-            print(f'{file_name} 不需要此文件,跳过')
+            log.info(f'{file_name} 不需要此文件,跳过')
             continue
     return None
 
@@ -170,7 +171,7 @@ def crawl_with_selenium(url):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                print("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止爬取")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -181,7 +182,7 @@ def crawl_with_selenium(url):
             # 访问下一页
             driver.get(next_page_url)
 
-            print(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始爬取 {next_page_url} 页面数据")
     finally:
         driver.quit()
 
@@ -227,7 +228,7 @@ def hierarchical_traversal(root_path):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:hebei")
 
         # 提取月份目录
         month_dirs = []
@@ -240,7 +241,7 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 gov_commodity_hebei_import_export.process_folder(md['path'])
                 gov_commodity_hebei_country.process_folder(md['path'])
                 gov_commodity_hebei_city.process_folder(md['path'])
@@ -251,8 +252,8 @@ if __name__ == "__main__":
     # 等待5s后执行
     time.sleep(5)
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"河北石家庄海关全量数据下载任务完成")
+    log.info(f"河北石家庄海关全量数据下载任务完成")
     time.sleep(5)
     base_mysql.update_january_yoy('河北省')
     base_mysql.update_shandong_yoy('河北省')
-    print("河北石家庄海关城市同比sql处理完成")
+    log.info("河北石家庄海关城市同比sql处理完成")

+ 13 - 12
hebei/gov_commodity_hebei_city.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 city_code_map = {
     "石家庄市": "130100",
@@ -23,11 +24,11 @@ city_code_map = {
 def get_df(path):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     for file in file_paths:
         if "地市" in file.name:
-            print(f"处理多文件: {file.name}")
+            log.info(f"处理多文件: {file.name}")
             file_path = Path(path) / file
             return pd.read_excel(file_path, header=None).iloc[5:]
     return None
@@ -38,7 +39,7 @@ def process_folder(path):
 
     df = get_df(path)
     if df is None:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
 
     if year == 2025 and month >= 3:
@@ -58,7 +59,7 @@ def process_folder(path):
 
         city_code = city_code_map.get(city_name)
         if not city_code:
-            print(f"未找到省 '{city_name}' 对应市编码")
+            log.info(f"未找到省 '{city_name}' 对应市编码")
             continue
 
         monthly_export, monthly_import, monthly_total = value_row(row, col_total_index, col_monthly_export_index, col_monthly_import_index)
@@ -70,22 +71,22 @@ def process_folder(path):
             monthly_total = round(float(monthly_total) / 2, 4)
             sql_1 = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-                   f"('2023', '2023-01', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+                   f"('2023', '2023-01', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now() ;\n")
             sql_arr_copy.append(sql_1)
 
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now() ;\n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
     if year_month == '2023-02':
-        print(f"√ {year_month} sql_arr_copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} sql_arr_copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_region_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_region_trade SQL 存表完成!")
 
 
 def value_row(row, col_total_index, col_monthly_export_index, col_monthly_import_index):
@@ -108,7 +109,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:hebei")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -117,9 +118,9 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"河北石家庄海关城市所有文件处理完成!")
+    log.info(f"河北石家庄海关城市所有文件处理完成!")

+ 13 - 12
hebei/gov_commodity_hebei_country.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "大洋洲", "南极洲",
                    "东南亚国家联盟", "欧洲联盟", "亚太经济合作组织",
@@ -13,11 +14,11 @@ EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "
 def get_df(path):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     for file in file_paths:
         if "国" in file.name:
-            print(f"处理多文件: {file.name}")
+            log.info(f"处理多文件: {file.name}")
             file_path = Path(path) / file
             return pd.read_excel(file_path, header=None).iloc[6:]
     return None
@@ -28,7 +29,7 @@ def process_folder(path):
 
     df = get_df(path)
     if df is None:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
 
     if year == 2025 and month >= 3:
@@ -55,7 +56,7 @@ def process_folder(path):
 
         country_code = base_country_code.COUNTRY_CODE_MAPPING.get(country_name)
         if not country_code:
-            print(f"{year_month} 未找到国家 '{country_name}' 对应国家的编码")
+            log.info(f"{year_month} 未找到国家 '{country_name}' 对应国家的编码")
             continue
 
         monthly_export, monthly_import, monthly_total = value_row(row, col_total_index, col_monthly_export_index, col_monthly_import_index)
@@ -67,21 +68,21 @@ def process_folder(path):
             monthly_total = round(float(monthly_total) / 2, 4)
             sql = (f"INSERT INTO t_yujin_crossborder_prov_country_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, country_code, country_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-                   f"('2023', '2023-01', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+                   f"('2023', '2023-01', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now();\n")
             sql_arr_copy.append(sql)
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_country_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, country_code, country_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now();\n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
     if year_month == '2023-02':
-        print(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_country_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_country_trade SQL 存表完成!")
 
 def value_row(row,col_total_index, col_monthly_export_index, col_monthly_import_index):
     monthly_total = str(row.values[col_total_index]).strip()
@@ -103,7 +104,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:hebei")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -112,9 +113,9 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"河北石家庄海关国家的所有文件处理完成!")
+    log.info(f"河北石家庄海关国家的所有文件处理完成!")

+ 15 - 12
hebei/gov_commodity_hebei_import_export.py

@@ -2,6 +2,7 @@ from pathlib import Path
 
 import pandas as pd
 import re
+from utils.log import log
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
@@ -47,7 +48,7 @@ def process_folder(path):
     value_index = 5 if year == 2025 and month >= 3 else 4
     res = df_data(path, name_index, value_index)
     if not res:
-        print(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
+        log.info(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
         return
     export_df, import_df = res
 
@@ -70,7 +71,7 @@ def save_to_database(merged_df, year, month):
             commodity_name = str(row['commodity']).strip()
             commodity_code,commodity_name_fix = base_mysql.get_commodity_id(commodity_name)
             if not commodity_code:
-                print(f"未找到商品名称 '{commodity_name}' 对应的 ID")
+                log.info(f"未找到商品名称 '{commodity_name}' 对应的 ID")
                 continue
             if not commodity_name_fix or commodity_name_fix in processed_commodities:
                 continue
@@ -89,32 +90,34 @@ def save_to_database(merged_df, year, month):
                 monthly_total = round(monthly_import + monthly_export, 4)
                 sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                        f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                       f"('2023', '2023-01', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                       f"('2023', '2023-01', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
+                       f"ON DUPLICATE KEY UPDATE create_time = now() ;")
                 sql_arr_copy.append(sql)
 
             sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                   f"('{year}', '{year_month}', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                   f"('{year}', '{year_month}', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
+                   f"ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)
 
     except Exception as e:
-        print(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
+        log.info(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
 
-    print(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
     if year_month == '2023-02':
-        print(f"√ {year_month} prov_commodity_trade copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_commodity_trade copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
 
 
 def df_data(path, name_index, value_index):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     import_df = pd.DataFrame()
     export_df = pd.DataFrame()
@@ -158,7 +161,7 @@ def hierarchical_traversal(root_path):
 
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:hebei")
 
         # 提取月份目录
         month_dirs = []
@@ -171,7 +174,7 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
@@ -179,4 +182,4 @@ if __name__ == '__main__':
 
     # root = Path(base_country_code.download_dir)/'2023'/'02'
     # process_folder(root)
-    print(f"河北石家庄海关出入口商品所有文件处理完成!")
+    log.info(f"河北石家庄海关出入口商品所有文件处理完成!")

+ 28 - 29
jiangsu/crawl_gov_jiangsu_full.py

@@ -18,10 +18,10 @@ import gov_commodity_jiangsu_city
 import gov_commodity_jiangsu_import_export
 
 from utils import base_country_code, base_mysql
+from utils.log import log
 
-# 显式指定 unrar 路径(根据实际情况修改)
-rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe"
-# rarfile.UNRAR_EXECUTABLE = "/usr/bin/unrar"  # Linux/macOS
+# rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe"
+rarfile.UNRAR_EXECUTABLE = "unrar"
 download_dir = base_country_code.download_dir
 Path(download_dir).mkdir(parents=True, exist_ok=True)
 
@@ -76,7 +76,7 @@ def find_target_links(driver):
         for download_btn in elements:
             # 获取文件名(用于后续判断)
             file_name = download_btn.text.strip()
-            print(f"正在下载: {file_name}")
+            log.info(f"正在下载: {file_name}")
 
             # 记录下载前的文件列表
             existing_files = set(f.name for f in Path(download_dir).glob('*'))
@@ -88,7 +88,7 @@ def find_target_links(driver):
             # 等待文件下载完成
             rar_files = wait_for_download_complete(existing_files=existing_files)
             if not rar_files:
-                print("未找到新下载的 .rar 文件")
+                log.info("未找到新下载的 .rar 文件")
                 continue
 
             downloaded_file = rar_files[0]
@@ -98,20 +98,20 @@ def find_target_links(driver):
                     # 获取压缩包中的第一个 .xls 文件
                     xls_files = [f for f in rf.namelist() if f.endswith('.xls') or f.endswith('.xlsx')]
                     if not xls_files:
-                        print(f"压缩包 {downloaded_file.name} 中没有 .xls 文件")
+                        log.info(f"压缩包 {downloaded_file.name} 中没有 .xls 文件")
                         continue
 
                     for xls_file in xls_files:
                         if xls_file.startswith('2022'):
                             return 'stop'
                         if not xls_file or '美元值' in xls_file or '企业性质' in xls_file or '贸易方式' in xls_file or '按收发货所在地' in xls_file or '主要商品' in xls_file:
-                            print(f"检测到不需要的文件:{xls_file},跳过")
+                            log.info(f"检测到不需要的文件:{xls_file},跳过")
                             continue
                         # 解压到临时目录
                         temp_dir = Path(download_dir) / 'temp'
                         temp_dir.mkdir(parents=True, exist_ok=True)
                         if not extract_rar(downloaded_file, temp_dir):
-                            print(f"解压文件 {downloaded_file.name} 时发生错误")
+                            log.info(f"解压文件 {downloaded_file.name} 时发生错误")
                             continue
                         # 获取解压后的文件路径
                         match = re.search(r"(\d{4})年(\d{1,2})月", xls_file)
@@ -123,39 +123,40 @@ def find_target_links(driver):
                         extracted_file = temp_dir / xls_file
                         final_path = Path(download_dir) / year / month / extracted_file.name
                         if os.path.exists(final_path):
-                            print(f"文件已存在:{extracted_file.name} 正在覆盖...")
+                            log.info(f"文件已存在:{extracted_file.name} 正在覆盖...")
                             os.unlink(final_path)
 
                         final_dir = Path(download_dir) / year / month
                         final_dir.mkdir(parents=True, exist_ok=True)
-                        print(f"√ 正在移动文件 {extracted_file} 至 {final_path}")
+                        log.info(f"√ 正在移动文件 {extracted_file} 至 {final_path}")
                         try:
                             extracted_file.rename(final_path)
-                            print(f"√ 下载成功:{final_path}")
+                            log.info(f"√ 下载成功:{final_path}")
                         except Exception as e:
-                            print(f"文件移动失败: {str(e)}")
+                            log.info(f"文件移动失败: {str(e)}")
 
                     # 删除临时目录(无论是否为空)
                     try:
                         shutil.rmtree(temp_dir)  # 替换 os.rmdir(temp_dir)
                     except Exception as e:
-                        print(f"删除临时目录失败: {str(e)}")
+                        log.info(f"删除临时目录失败: {str(e)}")
 
                 # 删除 .rar 文件
-                print(f"删除 .rar 文件:{downloaded_file}")
+                log.info(f"删除 .rar 文件:{downloaded_file}")
                 os.unlink(downloaded_file)
             else:
-                print(f"文件 {downloaded_file.name} 不是 .rar 文件,请手动处理")
+                log.info(f"文件 {downloaded_file.name} 不是 .rar 文件,请手动处理")
             # 将已处理的文件名加入集合
             processed_files.add(file_name)
         return None
     except Exception as e:
-        print(f"下载时发生异常: {str(e)}")
+        log.info(f"下载时发生异常: {str(e)}")
 
 def extract_rar(rar_path, extract_to):
     """备用解压函数(当 rarfile 失效时使用)"""
-    winrar_path = r"C:\Program Files\WinRAR\Rar.exe"  # 推荐使用 Rar.exe 而非 WinRAR.exe
-    cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
+    # winrar_path = r"C:\Program Files\WinRAR\Rar.exe"  # 推荐使用 Rar.exe 而非 WinRAR.exe
+    # cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
+    cmd = ["unrar", 'x', '-y', rar_path, str(extract_to)]
 
     # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
     creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
@@ -168,10 +169,10 @@ def extract_rar(rar_path, extract_to):
     )
 
     if result.returncode == 0:
-        print(f"解压成功: {rar_path} → {extract_to}")
+        log.info(f"解压成功: {rar_path} → {extract_to}")
         return True
     else:
-        print(f"解压失败: {result.stderr.decode('gbk')}")
+        log.info(f"解压失败: {result.stderr.decode('gbk')}")
         return False
 
 
@@ -208,7 +209,7 @@ def crawl_with_selenium(url):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                print("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止爬取")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -219,7 +220,7 @@ def crawl_with_selenium(url):
             # 访问下一页
             driver.get(next_page_url)
 
-            print(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始爬取 {next_page_url} 页面数据")
 
     finally:
         driver.quit()
@@ -227,13 +228,11 @@ def crawl_with_selenium(url):
 
 def wait_for_download_complete(timeout=30, existing_files=None):
     start_time = time.time()
-    temp_exts = ('.part', '.crdownload')
 
     if existing_files is None:
         existing_files = set(f.name for f in Path(download_dir).glob('*'))
 
     while (time.time() - start_time) < timeout:
-        current_files = set(f.name for f in Path(download_dir).glob('*'))
         new_files = [f for f in Path(download_dir).glob('*.rar') if f.name not in existing_files]
         if new_files:
             # 等待文件大小稳定(不再变化),确保下载完成
@@ -262,7 +261,7 @@ def hierarchical_traversal(root_path, all_records):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         # 提取月份目录
         month_dirs = []
@@ -275,21 +274,21 @@ def hierarchical_traversal(root_path, all_records):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 gov_commodity_jiangsu_import_export.process_folder(md['path'], all_records)
                 gov_commodity_jiangsu_country.process_folder(md['path'])
                 gov_commodity_jiangsu_city.process_folder(md['path'])
 
 if __name__ == "__main__":
     crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html')
-    print(f"江苏南京海关全量数据下载任务完成")
+    log.info(f"江苏南京海关全量数据下载任务完成")
     # 等待5s后执行
     time.sleep(5)
     all_records = base_mysql.get_hs_all()
     hierarchical_traversal(base_country_code.download_dir, all_records)
-    print("江苏南京海关类章、国家、城市所有文件处理完成!")
+    log.info("江苏南京海关类章、国家、城市所有文件处理完成!")
     time.sleep(5)
     base_mysql.update_january_yoy('江苏省')
     base_mysql.update_shandong_yoy('江苏省')
-    print("江苏南京海关城市同比sql处理完成")
+    log.info("江苏南京海关城市同比sql处理完成")
 

+ 32 - 33
jiangsu/gov_commodity_jiangsu_city.py

@@ -5,47 +5,45 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 city_code_map = {
-    "南京市": "3201",
-    "无锡市": "3202",
-    "徐州市": "3203",
-    "常州市": "3204",
-    "苏州市": "3205",
-    "南通市": "3206",
-    "连云港市": "3207",
-    "淮安市": "3208",
-    "盐城市": "3209",
-    "扬州市": "3210",
-    "镇江市": "3211",
-    "泰州市": "3212",
-    "宿迁市": "3213"
+    "南京市": "320100",
+    "无锡市": "320200",
+    "徐州市": "320300",
+    "常州市": "320400",
+    "苏州市": "320500",
+    "南通市": "320600",
+    "连云港市": "320700",
+    "淮安市": "320800",
+    "盐城市": "320900",
+    "扬州市": "321000",
+    "镇江市": "321100",
+    "泰州市": "321200",
+    "宿迁市": "321300"
 }
 
 ignore_city_code_arr = ['江阴市','宜兴市','常熟市','张家港市','昆山市','吴江市','太仓市','启东市','东台市','仪征市','丹阳市','兴化市']
 
-def get_df(path):
+def get_df(path, year_month):
     global df,  df_type
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     if len(file_paths) == 1:
         file_path = file_paths[0]
-        print(f"处理单文件: {file_path.name}")
+        log.info(f"处理单文件: {file_path.name}")
         xls = pd.ExcelFile(file_path)
 
-        sheet_name = base_country_code.find_sheet_by_keyword(file_path, "地")
-        if not sheet_name:
-            print(f"{file_path} 未找到包含 地市 sheet")
-            return None
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[5:]
+        sheet_index = 5 if year_month == '2024-11' else 3
+        df = pd.read_excel(xls, sheet_name=sheet_index, header=None).iloc[5:]
         df_type = 0
 
     else:
         for file in file_paths:
             if "地区" in file.name:
-                print(f"处理多文件: {file.name}")
+                log.info(f"处理多文件: {file.name}")
                 file_path = Path(path) / file
                 df = pd.read_excel(file_path, header=None).iloc[6:]
                 df_type = 1
@@ -57,9 +55,9 @@ def process_folder(path):
     year_month = f'{year}-{month:02d}'
 
     sql_arr = []
-    res = get_df(path)
+    res = get_df(path, year_month)
     if res is None:
-        print(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
+        log.info(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
         return
     df, df_type = res
     if df_type == 0:
@@ -77,11 +75,11 @@ def process_folder(path):
                 flag = True
                 break
         if flag:
-            print(f"忽略 {city_name}")
+            log.info(f"忽略 {city_name}")
             continue
         city_code = city_code_map.get(city_name)
         if not city_code:
-            print(f"未找到省 '{city_name}' 对应市编码")
+            log.info(f"未找到省 '{city_name}' 对应市编码")
             continue
 
         monthly_export, monthly_import, monthly_total = value_row(row, col_total_index)
@@ -92,13 +90,14 @@ def process_folder(path):
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '320000', '江苏省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '320000', '江苏省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now())"
+               f"ON DUPLICATE KEY UPDATE create_time = now(); \n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_region_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_region_trade SQL 存表完成!")
 
 
 def value_row(row,col_total_index):
@@ -115,7 +114,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -124,13 +123,13 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"江苏南京海关城市所有文件处理完成!")
+    log.info(f"江苏南京海关城市所有文件处理完成!")
     time.sleep(5)
     base_mysql.update_january_yoy('江苏省')
     base_mysql.update_shandong_yoy('江苏省')
-    print("江苏南京同比sql处理完成")
+    log.info("江苏南京同比sql处理完成")

+ 16 - 18
jiangsu/gov_commodity_jiangsu_country.py

@@ -4,34 +4,31 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 # 排除地区名单
 EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "大洋洲", "南极洲",
                    "东南亚国家联盟", "欧洲联盟", "亚太经济合作组织",
                    "区域全面经济伙伴关系协定(RCEP)成员国", "共建“一带一路”国家和地区"]
 
-def get_df(path):
+def get_df(path, year_month):
     global df,  df_type
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     if len(file_paths) == 1:
         file_path = file_paths[0]
-        print(f"处理单文件: {file_path.name}")
+        log.info(f"处理单文件: {file_path.name}")
         xls = pd.ExcelFile(file_path)
 
-        sheet_name = base_country_code.find_sheet_by_keyword(file_path, "国别")
-        if not sheet_name:
-            print(f"{file_path} 未找到包含 类章 sheet")
-            return None
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[5:]
+        df = pd.read_excel(xls, sheet_name=1, header=None).iloc[5:]
         df_type = 0
 
     else:
         for file in file_paths:
             if "国别" in file.name:
-                print(f"处理多文件: {file.name}")
+                log.info(f"处理多文件: {file.name}")
                 file_path = Path(path) / file
                 df = pd.read_excel(file_path, header=None).iloc[6:]
                 df_type = 1
@@ -44,7 +41,7 @@ def process_folder(path):
 
     sql_arr = []
     try:
-        df, df_type = get_df(path)
+        df, df_type = get_df(path, year_month)
         if df_type == 0:
             country_name_index = 0
             col_total_index, col_monthly_export_index, col_monthly_import_index = 1, 3, 5
@@ -68,7 +65,7 @@ def process_folder(path):
             # 获取国家编码
             country_code = base_country_code.COUNTRY_CODE_MAPPING.get(country_name)
             if not country_code:
-                print(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
+                log.info(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
                 continue
 
             # 提取数据并格式化
@@ -84,16 +81,17 @@ def process_folder(path):
                 f"monthly_total, monthly_export, monthly_import, yoy_import_export, yoy_import, yoy_export, create_time) "
                 f"VALUES ('{year}', '{year_month}', '320000', '江苏省', '{country_code}', '{country_name}', "
                 f"{format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', "
-                f"'{yoy_export}', NOW());"
+                f"'{yoy_export}', NOW())"
+                f"ON DUPLICATE KEY UPDATE create_time = now() ;"
             )
             sql_arr.append(sql)
     except Exception as e:
-        print(f"{year_month} 处理时发生异常: {str(e)}")
+        log.info(f"{year_month} 处理时发生异常: {str(e)}")
 
-    print(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
+    log.info(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
     # 批量插入数据库
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_country_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_country_trade SQL 存表完成!")
 
 def value_row(row, col_total_index, col_monthly_export_index, col_monthly_import_index):
     def value_special_handler(value):
@@ -117,7 +115,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -126,10 +124,10 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print("江苏南京海关国别所有文件处理完成!")
+    log.info("江苏南京海关国别所有文件处理完成!")

+ 14 - 16
jiangsu/gov_commodity_jiangsu_import_export.py

@@ -4,7 +4,7 @@ from pathlib import Path
 import pandas as pd
 
 from utils import base_country_code, base_mysql
-from utils.base_country_code import format_sql_value
+from utils.log import log
 
 YEAR_PATTERN = re.compile(r"^\d{4}$")
 MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
@@ -14,25 +14,22 @@ all_records = []
 def process_folder(path, all_records):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     year, month = base_country_code.extract_year_month_from_path(path)
     year_month = f'{year}-{month:02d}'
 
     if len(file_paths) == 1:
         file_path = file_paths[0]
-        print(f"处理单文件: {file_path.name}")
+        log.info(f"处理单文件: {file_path.name}")
         # 读取所有sheet
         xls = pd.ExcelFile(file_path)
         import_df = pd.DataFrame()
         export_df = pd.DataFrame()
         total_df = pd.DataFrame()
-        sheet_name = base_country_code.find_sheet_by_keyword(file_path, "类章")
-        if not sheet_name:
-            print(f"{file_path} 未找到包含 类章 sheet")
-            return
         skip_index = 4 if year_month == '2024-11' else 5
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[skip_index:]
+        sheet_index = 6 if year_month == '2024-11' else 4
+        df = pd.read_excel(xls, sheet_name=sheet_index, header=None).iloc[skip_index:]
         temp_df = df[[0, 5]].rename(columns={0: 'commodity', 5: 'import'})
         temp_df['import'] = pd.to_numeric(temp_df['import'].replace('--', 0), errors='coerce')
         temp_df['import'] = temp_df['import'] * 10000
@@ -55,7 +52,7 @@ def process_folder(path, all_records):
         total_df = pd.DataFrame()
         for file in file_paths:
             if "商品类章" in file.name:
-                print(f"处理多文件: {file.name}")
+                log.info(f"处理多文件: {file.name}")
                 file_path = Path(path) / file
                 df = pd.read_excel(file_path, header=None).iloc[6:]
 
@@ -97,10 +94,10 @@ def save_to_database(import_df, export_df, total_df, year, month, all_records):
         # 找类名确定索引
         result = extract_category_or_chapter(commodity_name, all_records_index)
         if result is None:
-            print(f"未找到商品名称 '{commodity_name}' 对应的ID")
+            log.info(f"未找到商品名称 '{commodity_name}' 对应的ID")
             continue
         if result >= len(all_records):
-            print(f"all_records 已超限 '{commodity_name}' 跳过")
+            log.info(f"all_records 已超限 '{commodity_name}' 跳过")
             continue
         all_records_index = result
 
@@ -114,14 +111,15 @@ def save_to_database(import_df, export_df, total_df, year, month, all_records):
 
         sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time, commodity_source) VALUES "
-               f"('{year}', '{year_month}', '320000', '江苏省', '{commodity_code}', '{category_name}', {monthly_total}, {monthly_export}, {monthly_import}, now(), 1);")
+               f"('{year}', '{year_month}', '320000', '江苏省', '{commodity_code}', '{category_name}', {monthly_total}, {monthly_export}, {monthly_import}, now(), 1)"
+               f"ON DUPLICATE KEY UPDATE create_time = now() ;")
         sql_arr.append(sql)
 
         processed_commodities.add(commodity_code)
 
-    print(f"√ {year_month} 成功生成SQL文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} 成功生成SQL文件 size {len(sql_arr)} ")
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
 
 
 def extract_category_or_chapter(text, all_records_index):
@@ -146,7 +144,7 @@ def hierarchical_traversal(root_path, all_records):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         # 提取月份目录
         month_dirs = []
@@ -159,7 +157,7 @@ def hierarchical_traversal(root_path, all_records):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'], all_records)
 
 if __name__ == '__main__':

+ 4 - 29
utils/base_country_code.py

@@ -3,7 +3,8 @@ import re
 from pathlib import Path
 
 import pandas as pd
-from openpyxl import load_workbook
+
+from utils.log import log
 
 YEAR_PATTERN = re.compile(r"^\d{4}$")
 MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
@@ -22,32 +23,6 @@ def format_sql_value(value):
     else:
         return f"'{value}'"
 
-def find_sheet_by_keyword(file_path, keyword):
-    """
-    模糊查找包含关键字的 sheet 名称(支持 .xls 和 .xlsx)
-
-    :param file_path: Excel 文件路径
-    :param keyword: 要匹配的关键字(如 '类章')
-    :return: 匹配到的第一个 sheet 名称,或 None
-    """
-    # 处理 .xlsx 文件
-    if file_path.suffix == ".xlsx":
-        workbook = load_workbook(filename=file_path, read_only=True)
-        sheets = workbook.sheetnames
-    # 处理 .xls 文件
-    elif file_path.suffix == ".xls":
-        import xlrd
-        workbook = xlrd.open_workbook(file_path)
-        sheets = workbook.sheet_names()
-    else:
-        raise ValueError(f"不支持的文件格式:{file_path.suffix}")
-
-    # 精确匹配 + 模糊匹配策略
-    for sheet in sheets:
-        if keyword.lower() in sheet.lower():
-            return sheet
-    return None
-
 def get_previous_month_dir(current_path):
     """生成前月目录路径"""
     try:
@@ -63,7 +38,7 @@ def get_previous_month_dir(current_path):
 
         return current_path.parent.parent / current_path.parent.name / f"{prev_month:02d}"
     except Exception as e:
-        print(f"前月目录生成失败:{str(e)}")
+        log.info(f"前月目录生成失败:{str(e)}")
         return None
 
 COUNTRY_CODE_MAPPING = {
@@ -360,4 +335,4 @@ download_dir_find = os.path.abspath(os.path.join('downloads/demo'))
 
 if __name__ == '__main__':
     year, month = extract_year_month_from_path(Path(download_dir)/'2025'/'02')
-    print(year, month)
+    log.info(year, month)

+ 43 - 17
utils/base_mysql.py

@@ -2,6 +2,8 @@ import pymysql
 from sqlalchemy import create_engine, text
 from urllib.parse import quote_plus
 
+from utils.log import log
+
 # 数据库配置
 DB_CONFIG = {
     'host': '10.130.75.149',
@@ -31,14 +33,14 @@ def get_commodity_id(commodity_name):
                 if len(result) == 1:
                     return result[0][0], result[0][1]
                 else:
-                    print(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
+                    log.info(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
                     sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
                     cursor.execute(sql, (f"{fix_commodity_name}",))
                     result = cursor.fetchone()
                     if not result:
                         # 用原商品名称再查一次
                         commodity_name = commodity_name.replace("(", "(").replace(")", ")")
-                        print(f"原商品名称查询,commodity_name:{commodity_name}")
+                        log.info(f"原商品名称查询,commodity_name:{commodity_name}")
                         sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
                         cursor.execute(sql, (f"{commodity_name}",))
                         result = cursor.fetchone()
@@ -51,7 +53,7 @@ def get_commodity_id(commodity_name):
             else:
                 return None, None
     except Exception as e:
-        print(f"查询数据库时发生异常: {str(e)}")
+        log.info(f"查询数据库时发生异常: {str(e)}")
         return None, None
     finally:
         if connection:
@@ -72,12 +74,32 @@ def get_hs_all():
             else:
                 return None
     except Exception as e:
-        print(f"查询数据库时发生异常: {str(e)}")
+        log.info(f"查询数据库时发生异常: {str(e)}")
         return None
     finally:
         if connection:
             connection.close()
 
+def get_code_exist(crossborder_year_month, prov_code):
+    try:
+        # 使用 with 自动管理连接生命周期
+        with pymysql.connect(**DB_CONFIG) as connection:
+            with connection.cursor() as cursor:
+                # 执行查询
+                sql = """
+                    SELECT COUNT(1) 
+                    FROM t_yujin_crossborder_prov_commodity_trade e 
+                    WHERE e.crossborder_year_month = %s 
+                      AND e.prov_code = %s
+                """
+                cursor.execute(sql, (crossborder_year_month, prov_code))
+                result = cursor.fetchone()
+                return int(result[0]) if result and result[0] else 0
+    except Exception as e:
+        log.info(f"[数据库查询异常] 查询条件: {crossborder_year_month}, {prov_code} | 错误详情: {str(e)}")
+        return 0
+
+
 # 对密码进行 URL 编码
 encoded_password = quote_plus(DB_CONFIG["password"])
 
@@ -94,7 +116,7 @@ def bulk_insert(sql_statements):
     :param sql_statements: 包含多个 INSERT 语句的列表
     """
     if not sql_statements:
-        print("未提供有效的 SQL 插入语句,跳过操作")
+        log.info("未提供有效的 SQL 插入语句,跳过操作")
         return
 
     try:
@@ -103,9 +125,9 @@ def bulk_insert(sql_statements):
                 for sql in sql_statements:
                     stmt = text(sql.strip())
                     conn.execute(stmt)
-                print(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
+                log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
     except Exception as e:
-        print(f"数据库操作失败: {str(e)}")
+        log.info(f"数据库操作失败: {str(e)}")
         raise
 
 def update_january_yoy(prov_name):
@@ -148,11 +170,11 @@ def update_january_yoy(prov_name):
     try:
         with engine.begin() as conn:
             result = conn.execute(update_sql, {'prov_name': prov_name})
-            print(f"Updated {result.rowcount} rows for {prov_name}")
+            log.info(f"Updated {result.rowcount} rows for {prov_name}")
             return result.rowcount
 
     except Exception as e:
-        print(f"Update failed: {str(e)}")
+        log.info(f"Update failed: {str(e)}")
         raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
 
 def clear_old_shandong_yoy(prov_name):
@@ -174,10 +196,10 @@ def clear_old_shandong_yoy(prov_name):
     try:
         with engine.begin() as conn:
             result = conn.execute(clear_sql, {'prov_name': prov_name})
-            print(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
+            log.info(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
             return result.rowcount
     except Exception as e:
-        print(f"旧数据清零失败: {str(e)}")
+        log.info(f"旧数据清零失败: {str(e)}")
         raise
 
 def update_shandong_yoy(prov_name):
@@ -191,10 +213,10 @@ def update_shandong_yoy(prov_name):
         # 步骤2:计算新数据
         updated = _update_shandong_new_yoy(prov_name)
 
-        print(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
+        log.info(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
         return {'cleared': cleared, 'updated': updated}
     except Exception as e:
-        print("{prov_name} 数据处理失败", exc_info=True)
+        log.info("{prov_name} 数据处理失败", exc_info=True)
         raise
 
 def _update_shandong_new_yoy(prov_name):
@@ -232,11 +254,15 @@ def _update_shandong_new_yoy(prov_name):
 
     with engine.begin() as conn:
         result = conn.execute(update_sql, {'prov_name': prov_name})
-        print(f"{prov_name} 新数据更新数: {result.rowcount}")
+        log.info(f"{prov_name} 新数据更新数: {result.rowcount}")
         return result.rowcount
 
 
 if __name__ == '__main__':
-    update_january_yoy('浙江省')
-    update_shandong_yoy('浙江省')
-    print("同比sql处理完成")
+    check_year, check_month = 2024, 4
+    count = get_code_exist(f'{check_year}-{check_month:02d}', "340000")
+    print(count)
+
+    # update_january_yoy('浙江省')
+    # update_shandong_yoy('浙江省')
+    # log.info("同比sql处理完成")

+ 7 - 4
utils/crawl_gov_commodity.py

@@ -1,5 +1,8 @@
 import pandas as pd
-from com.zf.crawl import base_mysql
+
+from utils import base_mysql
+from utils.log import log
+
 
 def generate_sql_from_excel(excel_file):
     # 读取 Excel 文件
@@ -21,12 +24,12 @@ def generate_sql_from_excel(excel_file):
                 commodity_name = commodity_name.replace('(', '(').replace(')', ')')
         sql = f"INSERT INTO t_yujin_crossborder_prov_commodity_category (code, level,commodity_code, commodity_name, create_time) VALUES ('{code}', {level}, '{commodity_code}', '{commodity_name}', now());"
         sql_arr.append(sql)
-        print(sql)
+        log.info(sql)
 
-    print(f"√ 成功生成 commodity category SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ 成功生成 commodity category SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
-    print("trade SQL 存表完成!")
+    log.info("trade SQL 存表完成!")
 
 # 商品书数据初始化
 excel_file = 'C:\\Users\\admin\\Desktop\\海关总署数据梳理.xlsx'  # 输入的 Excel 文件

+ 25 - 48
zhejiang/crawl_gov_zhejiangi_full.py

@@ -1,7 +1,6 @@
 import os
 import random
 import re
-import subprocess
 import time
 from pathlib import Path
 from urllib.parse import urljoin
@@ -18,6 +17,7 @@ import gov_commodity_zhejiang_city
 import gov_commodity_zhejiang_country
 import gov_commodity_zhejiang_import_export
 from utils import base_country_code, base_mysql
+from utils.log import log
 
 download_dir = base_country_code.download_dir
 Path(download_dir).mkdir(parents=True, exist_ok=True)
@@ -62,7 +62,7 @@ def crawl_by_year_tabs(driver, base_url):
     for tab in year_tabs:
         year_text = tab.text.strip()
         if int(year_text[:4]) <= 2022:
-            print(f"{year_text} 后的数据无需下载")
+            log.info(f"{year_text} 后的数据无需下载")
             continue
 
         year_url = tab.get_attribute("href")
@@ -72,7 +72,7 @@ def crawl_by_year_tabs(driver, base_url):
         # 新标签页打开年份页面
         driver.execute_script("window.open(arguments[0]);", year_url)
         driver.switch_to.window(driver.window_handles[-1])
-        print(f"\n正在处理 {year_text} 年份页面")
+        log.info(f"\n正在处理 {year_text} 年份页面")
 
         process_month_tabs(driver, year_text, base_url)
 
@@ -98,7 +98,7 @@ def process_month_tabs(driver, year, base_url):
             # 全量获取所有月份Tab
             month_items = driver.find_elements(By.XPATH, '//ul[@class="nav_tab"]//li')
             if not month_items:
-                print(f"{year}年没有月份Tab,停止处理")
+                log.info(f"{year}年没有月份Tab,停止处理")
                 break
 
             all_found = True
@@ -114,7 +114,7 @@ def process_month_tabs(driver, year, base_url):
                 if not month_text in target_months:
                     continue  # 跳过已处理月份
 
-                print(f"点击月份Tab:{year}-{month_text}")
+                log.info(f"点击月份Tab:{year}-{month_text}")
                 a_tag.click()
 
                 # 处理详情页逻辑
@@ -123,9 +123,9 @@ def process_month_tabs(driver, year, base_url):
                 )
                 detail_link_arr = get_behind_detail_link(driver, base_url)
                 if not detail_link_arr:
-                    print(f"{year}-{month_text} 未找到详情链接")
+                    log.info(f"{year}-{month_text} 未找到详情链接")
                 for detail_link in detail_link_arr:
-                    print(f"{year}-{month_text} 详情链接:{detail_link}")
+                    log.info(f"{year}-{month_text} 详情链接:{detail_link}")
                     driver.get(detail_link)
                     download_file_from_detail_page(driver)
                     driver.back()
@@ -137,24 +137,24 @@ def process_month_tabs(driver, year, base_url):
                 found = True
 
             if not found:
-                print(f"{year}年未找到 {month_text} Tab")
+                log.info(f"{year}年未找到 {month_text} Tab")
                 all_found = False
 
             if all_found:
-                print(f"{year}年所有目标月份处理完成")
+                log.info(f"{year}年所有目标月份处理完成")
                 break
             else:
                 # 部分月份未找到,重新获取元素
                 # retry_count += 1
-                print(f"第 {retry_count} 次重试获取月份Tab...")
+                log.info(f"第 {retry_count} 次重试获取月份Tab...")
                 time.sleep(2)
 
         except StaleElementReferenceException:
-            print("页面刷新,重新获取月份Tab列表...")
+            log.info("页面刷新,重新获取月份Tab列表...")
             # retry_count += 1
             time.sleep(2)
 
-    print(f"{year}年最终处理的月份:{processed_months}")
+    log.info(f"{year}年最终处理的月份:{processed_months}")
 
 def get_behind_detail_link(driver, base_url):
    """获取点击月份Tab后 conList_ul 下所有 li 的 a 标签完整链接"""
@@ -170,7 +170,7 @@ def get_behind_detail_link(driver, base_url):
            href_arr.append(full_url)
        return href_arr
    except Exception as e:
-       print(f"获取详情链接失败: {str(e)}")
+       log.info(f"获取详情链接失败: {str(e)}")
        return []
 
 def download_file_from_detail_page(driver):
@@ -181,7 +181,7 @@ def download_file_from_detail_page(driver):
     try:
         elements = driver.find_elements(By.XPATH, '//div[@class="easysite-news-content"]//div[@id="easysiteText"]//p//a')
         if not elements:
-            print("详情页未找到目标文件链接")
+            log.info("详情页未找到目标文件链接")
             return
 
         for download_btn in elements:
@@ -191,10 +191,10 @@ def download_file_from_detail_page(driver):
             file_url = download_btn.get_attribute("href")
 
             if not file_url.lower().endswith(('.xls', '.xlsx')):
-                print(f"跳过非 Excel 文件: {file_url}")
+                log.info(f"跳过非 Excel 文件: {file_url}")
                 continue
 
-            print(f"正在下载: {file_name} → {file_url}")
+            log.info(f"正在下载: {file_name} → {file_url}")
 
             # 记录下载前的文件列表
             existing_files = set(f.name for f in Path(download_dir).glob('*'))
@@ -207,17 +207,17 @@ def download_file_from_detail_page(driver):
             year, start_month, month = extract_year_and_month(file_name)
             final_path = Path(download_dir) / year / month / f"{file_name}"
             if os.path.exists(final_path):
-                print(f"文件已存在:{file_name} 正在覆盖...")
+                log.info(f"文件已存在:{file_name} 正在覆盖...")
                 os.unlink(final_path)
 
             final_dir = Path(download_dir) / year / month
             final_dir.mkdir(parents=True, exist_ok=True)
-            print(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
+            log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
             downloaded_file.rename(final_path)
-            print(f"√ 下载成功:{final_path}")
+            log.info(f"√ 下载成功:{final_path}")
 
     except Exception as e:
-        print(f"详情页处理异常: {str(e)}")
+        log.info(f"详情页处理异常: {str(e)}")
 
 def extract_year_and_month(file_name):
     # 支持两种格式:
@@ -234,29 +234,6 @@ def extract_year_and_month(file_name):
     else:
         raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
 
-def extract_rar(rar_path, extract_to):
-    """备用解压函数(当 rarfile 失效时使用)"""
-    winrar_path = r"C:\Program Files\WinRAR\Rar.exe"  # 推荐使用 Rar.exe 而非 WinRAR.exe
-    cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
-
-    # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
-    creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
-
-    result = subprocess.run(
-        cmd,
-        stdout=subprocess.PIPE,
-        stderr=subprocess.PIPE,
-        creationflags=creationflags  # 关键点:隐藏窗口
-    )
-
-    if result.returncode == 0:
-        print(f"解压成功: {rar_path} → {extract_to}")
-        return True
-    else:
-        print(f"解压失败: {result.stderr.decode('gbk')}")
-        return False
-
-
 def crawl_with_selenium(url):
     driver = webdriver.Firefox(options=configure_stealth_options())
     base_url = 'http://hangzhou.customs.gov.cn'
@@ -320,7 +297,7 @@ def hierarchical_traversal(root_path):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:zhejiang")
 
         # 提取月份目录
         month_dirs = []
@@ -333,20 +310,20 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 gov_commodity_zhejiang_import_export.process_folder(md['path'])
                 gov_commodity_zhejiang_country.process_folder(md['path'])
                 gov_commodity_zhejiang_city.process_folder(md['path'])
 
 if __name__ == "__main__":
     crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html')
-    print(f"浙江杭州海关全量数据下载任务完成")
+    log.info(f"浙江杭州海关全量数据下载任务完成")
     # 等待5s后执行
     time.sleep(5)
     hierarchical_traversal(base_country_code.download_dir)
-    print("浙江杭州海关类章、国家、城市所有文件处理完成!")
+    log.info("浙江杭州海关类章、国家、城市所有文件处理完成!")
     time.sleep(5)
     base_mysql.update_january_yoy('浙江省')
     base_mysql.update_shandong_yoy('浙江省')
-    print("浙江杭州海关城市同比sql处理完成")
+    log.info("浙江杭州海关城市同比sql处理完成")
 

+ 16 - 23
zhejiang/gov_commodity_zhejiang_city.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 city_code_map = {
     "杭州地区": "330100",
@@ -23,27 +24,19 @@ city_code_map = {
 def get_df(path, year_month):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
-    file_path = file_paths[0]
-    sheet_name = base_country_code.find_sheet_by_keyword(file_path, "十一地市")
-
     flag = True
-    if not sheet_name:
-        print(f"{file_path} 未找到包含 十一地市 sheet")
-        # 23年1-11月数据要在多文件里找
+    file_path = file_paths[0]
+    if len(file_paths) > 1:
         for file_path in file_paths:
             if '十一地市' in file_path.name:
                 file_path = file_path
                 flag = False
                 break
-
-    if not sheet_name and flag:
-        print(f"{path} 未找到包含 十一地市 sheet或文件")
-        return None
     if flag:
         xls = pd.ExcelFile(file_path)
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None)
+        df = pd.read_excel(xls, sheet_name=0, header=None)
     else:
         df = pd.read_excel(file_path, header=None)
 
@@ -77,7 +70,7 @@ def process_folder(path):
     sql_arr = []
     res = get_df(path, None)
     if res is None:
-        print(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
+        log.info(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
         return
     import_df, export_df, total_df = res
     # 当月数据分组清洗
@@ -89,7 +82,7 @@ def process_folder(path):
         previous_month_dir = base_country_code.get_previous_month_dir(path)
         res = get_df(previous_month_dir, year_month)
         if res is None:
-            print(f"{path} 上月目录里文件未找到包含 地市 sheet")
+            log.info(f"{path} 上月目录里文件未找到包含 地市 sheet")
             return
         prev_import_df, prev_export_df, prev_total_df = res
 
@@ -107,7 +100,7 @@ def process_folder(path):
 
         total_df = pd.merge(total_df, prev_total_df, on='commodity', how='left')
         total_df['total'] = round(total_df['total_x'] - total_df['total_y'], 4)
-        print(f"合并文件: {path}*********{previous_month_dir}")
+        log.info(f"合并文件: {path}*********{previous_month_dir}")
 
     # 合并进出口数据
     merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer')
@@ -117,7 +110,7 @@ def process_folder(path):
         city_name = str(row['commodity']).strip()
         city_code = city_code_map.get(city_name)
         if not city_code:
-            print(f"未找到省 '{city_name}' 对应市编码")
+            log.info(f"未找到省 '{city_name}' 对应市编码")
             continue
 
         # 提取数据并格式化
@@ -134,12 +127,12 @@ def process_folder(path):
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '330000', '浙江省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '330000', '浙江省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now();\n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_region_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_region_trade SQL 存表完成!")
 
 def hierarchical_traversal(root_path):
     root = Path(root_path)
@@ -149,7 +142,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:zhejiang")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -158,15 +151,15 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"浙江杭州海关城市所有文件处理完成!")
+    log.info(f"浙江杭州海关城市所有文件处理完成!")
     time.sleep(5)
     base_mysql.update_january_yoy('浙江省')
     base_mysql.update_shandong_yoy('浙江省')
-    print("同比sql处理完成")
+    log.info("同比sql处理完成")
     # root = Path(base_country_code.download_dir)/'2024'/'07'
     # process_folder(root)

+ 22 - 29
zhejiang/gov_commodity_zhejiang_country.py

@@ -4,6 +4,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 # 排除地区名单
 EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "大洋洲", "南极洲",
@@ -13,11 +14,11 @@ EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "
 def get_df_country(path, year_month):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
 
     file_path = file_paths[0]
-    print(f"处理文件: {file_path.name}")
+    log.info(f"处理文件: {file_path.name}")
 
     xls = pd.ExcelFile(file_path)
     import_df = pd.DataFrame()
@@ -25,23 +26,15 @@ def get_df_country(path, year_month):
     total_df = pd.DataFrame()
 
     flag = True
-    sheet_name = base_country_code.find_sheet_by_keyword(file_path, "国别")
-    if not sheet_name:
-        print(f"{file_path} 未找到包含 国别 sheet")
-        sheet_name = base_country_code.find_sheet_by_keyword(file_path, "组织")
-        if not sheet_name:
-            print(f"{file_path} 未找到包含 组织 sheet")
-            # 23年1-11月数据要在多文件里找
-            for file_path in file_paths:
-                if '洲贸组织' in file_path.name:
-                    file_path = file_path
-                    flag = False
-                    break
-    if not sheet_name and flag:
-        print(f"{path} 未找到包含 国别 | 组织 | 洲贸组织 sheet或文件")
-        return None
+    file_path = file_paths[0]
+    if len(file_paths) > 1:
+        for file_path in file_paths:
+            if '洲贸组织' in file_path.name:
+                file_path = file_path
+                flag = False
+                break
     if flag:
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None)
+        df = pd.read_excel(xls, sheet_name=1, header=None)
     else:
         df = pd.read_excel(file_path, header=None)
     temp_df = df[[0, 1]].rename(columns={0: 'commodity', 1: 'total'})
@@ -67,7 +60,7 @@ def get_df_country(path, year_month):
 def process_folder(path):
     res = get_df_country(path, None)
     if not res:
-        print(f"{path} 目录里文件未找到包含 国别 sheet")
+        log.info(f"{path} 目录里文件未找到包含 国别 sheet")
         return
     import_df, export_df, total_df = res
 
@@ -83,7 +76,7 @@ def process_folder(path):
         previous_month_dir = base_country_code.get_previous_month_dir(path)
         res = get_df_country(previous_month_dir, year_month)
         if not res:
-            print(f"{path} 上月目录里文件未找到包含 国别 sheet")
+            log.info(f"{path} 上月目录里文件未找到包含 国别 sheet")
             return
         prev_import_df, prev_export_df, prev_total_df = res
 
@@ -101,7 +94,7 @@ def process_folder(path):
 
         total_df = pd.merge(total_df, prev_total_df, on='commodity', how='left')
         total_df['total'] = round(total_df['total_x'] - total_df['total_y'], 4)
-        print(f"合并文件: {path}*********{previous_month_dir}")
+        log.info(f"合并文件: {path}*********{previous_month_dir}")
 
     # 合并进出口数据
     merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer')
@@ -119,7 +112,7 @@ def process_folder(path):
         # 获取国家编码
         country_code = base_country_code.COUNTRY_CODE_MAPPING.get(country_name)
         if not country_code:
-            print(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
+            log.info(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
             continue
 
         # 提取数据并格式化
@@ -139,16 +132,16 @@ def process_folder(path):
             f"monthly_total, monthly_export, monthly_import, yoy_import_export, yoy_import, yoy_export, create_time) "
             f"VALUES ('{year}', '{year_month}', '330000', '浙江省', '{country_code}', '{country_name}', "
             f"{format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', "
-            f"'{yoy_export}', NOW());"
+            f"'{yoy_export}', NOW()) ON DUPLICATE KEY UPDATE create_time = now();"
         )
         sql_arr.append(sql)
     # except Exception as e:
-    #     print(f"{year_month} 处理时发生异常: {str(e)}")
+    #     log.info(f"{year_month} 处理时发生异常: {str(e)}")
 
-    print(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
+    log.info(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
     # 批量插入数据库
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_country_trade SQL 存表完成!\n")
+    log.info(f"√ {year_month} prov_country_trade SQL 存表完成!\n")
 
 def hierarchical_traversal(root_path):
     root = Path(root_path)
@@ -158,7 +151,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:zhejiang")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -167,7 +160,7 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 
@@ -176,4 +169,4 @@ if __name__ == '__main__':
 
     root = Path(base_country_code.download_dir) / '2024' / '07'
     process_folder(root)
-    print("浙江杭州海关国别所有文件处理完成!")
+    log.info("浙江杭州海关国别所有文件处理完成!")

+ 27 - 38
zhejiang/gov_commodity_zhejiang_import_export.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 CUSTOM_COMMODITY_REPLACEMENTS = {
     '稻谷及大米': '稻谷、大米及大米粉',
@@ -43,31 +44,25 @@ def clean_commodity_name(name, preserve_keywords=None):
 def get_df_import_export(path, year_month):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
-    file_path = file_paths[0]
-    print(f"处理文件: {file_path.name}")
-
-    xls = pd.ExcelFile(file_path)
-    import_df = pd.DataFrame()
-    export_df = pd.DataFrame()
 
     flag = True
-    sheet_name = base_country_code.find_sheet_by_keyword(file_path, "主出商品")
-    if not sheet_name:
-        print(f"{file_path} 单文件未找到包含 主出商品 sheet")
-        # 23年1-11月数据要在多文件里找
+    file_path = file_paths[0]
+    if len(file_paths) > 1:
         for file_path in file_paths:
             if '主要出口商品' in file_path.name:
                 file_path = file_path
                 flag = False
                 break
-    if not sheet_name and flag:
-        print(f"{path} 中未找到 主出商品 sheet或文件")
-        return None
+    log.info(f"处理文件: {file_path.name}")
+
+    xls = pd.ExcelFile(file_path)
+    import_df = pd.DataFrame()
+    export_df = pd.DataFrame()
 
     if flag:
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[2:]
+        df = pd.read_excel(xls, sheet_name=4, header=None).iloc[2:]
     else:
         df = pd.read_excel(file_path, header=None).iloc[1:]
     temp_df = df[[0, 1]].rename(columns={0: 'commodity', 1: 'export'})
@@ -83,21 +78,15 @@ def get_df_import_export(path, year_month):
     export_df = pd.concat([export_df, temp_df])
 
     flag_2 = True
-    sheet_name = base_country_code.find_sheet_by_keyword(file_path, "主进商品")
-    if not sheet_name:
-        print(f"{file_path} 单文件未找到包含 主进商品 sheet")
-        # 23年1-11月数据要在多文件里找
+
+    if len(file_paths) > 1:
         for file_path in file_paths:
             if '主要进口商品' in file_path.name:
                 file_path = file_path
                 flag_2 = False
                 break
-    if not sheet_name and flag_2:
-        print(f"{path} 中未找到 主进商品 sheet或文件")
-        return None
-
     if flag_2:
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[2:]
+        df = pd.read_excel(xls, sheet_name=5, header=None).iloc[2:]
     else:
         df = pd.read_excel(file_path, header=None).iloc[1:]
     temp_df = df[[0, 1]].rename(columns={0: 'commodity', 1: 'import'})
@@ -118,7 +107,7 @@ def get_df_import_export(path, year_month):
 def process_folder(path):
     res = get_df_import_export(path, None)
     if not res:
-        print(f"{path} 目录里文件未找到包含 主出、主进商品 sheet")
+        log.info(f"{path} 目录里文件未找到包含 主出、主进商品 sheet")
         return
     import_df, export_df = res
 
@@ -133,7 +122,7 @@ def process_folder(path):
         previous_month_dir = base_country_code.get_previous_month_dir(path)
         res = get_df_import_export(previous_month_dir, year_month)
         if not res:
-            print(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
+            log.info(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
             return
         prev_import_df, prev_export_df = res
 
@@ -147,7 +136,7 @@ def process_folder(path):
 
         curr_export = pd.merge(curr_export, prev_export, on='commodity', how='left')
         curr_export['export'] = round(curr_export['export_x'] - curr_export['export_y'], 4)
-        print(f"合并文件: {path}*********{previous_month_dir}")
+        log.info(f"合并文件: {path}*********{previous_month_dir}")
 
     # 合并进出口数据
     merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer')
@@ -161,14 +150,14 @@ def save_to_database(merged_df, year, month):
         for _, row in merged_df.iterrows():
             commodity_name = str(row['commodity']).strip()
             if commodity_name == '消费品' or commodity_name == '劳动密集型产品':
-                print(f'{commodity_name} 商品不存在,跳过')
+                log.info(f'{commodity_name} 商品不存在,跳过')
                 continue
             commodity_code, commodity_name_fix = base_mysql.get_commodity_id(commodity_name)
             if not commodity_code:
-                print(f"未找到商品名称 '{commodity_name}' 对应的 ID")
+                log.info(f"未找到商品名称 '{commodity_name}' 对应的 ID")
                 continue
             if not commodity_name_fix or commodity_name_fix in processed_commodities:
-                print(f"已处理过 '{commodity_name_fix}',传入name:{commodity_name}")
+                log.info(f"已处理过 '{commodity_name_fix}',传入name:{commodity_name}")
                 continue
 
             if year == 2025 or (year == 2024 and month in [7, 8, 9, 10, 11, 12]):
@@ -190,19 +179,19 @@ def save_to_database(merged_df, year, month):
 
             sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                   f"('{year}', '{year_month}', '330000', '浙江省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                   f"('{year}', '{year_month}', '330000', '浙江省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now()) ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)
-            # print(f'{commodity_name} -> {commodity_name_fix}')
+            # log.info(f'{commodity_name} -> {commodity_name_fix}')
 
     except Exception as e:
-        print(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
+        log.info(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
 
-    print(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_commodity_trade SQL 存表完成!\n")
+    log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!\n")
 
 def hierarchical_traversal(root_path):
     """分层遍历:省份->年份->月目录"""
@@ -215,7 +204,7 @@ def hierarchical_traversal(root_path):
 
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         # 提取月份目录
         month_dirs = []
@@ -228,7 +217,7 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
@@ -236,4 +225,4 @@ if __name__ == '__main__':
 
     # root = Path(base_country_code.download_dir)/'2023'/'01'
     # process_folder(root)
-    print("浙江杭州海关类章所有文件处理完成!")
+    log.info("浙江杭州海关类章所有文件处理完成!")