Explorar o código

crawl code review

zhangfan hai 3 días
pai
achega
b7c4409e39

+ 52 - 33
anhui/crawl_gov_anhui_full.py

@@ -1,8 +1,9 @@
 import os
 import random
 import re
-import subprocess
+import sys
 import time
+from datetime import datetime, timedelta
 from pathlib import Path
 
 from faker import Faker
@@ -24,7 +25,7 @@ Path(download_dir).mkdir(parents=True, exist_ok=True)
 def configure_stealth_options():
     """增强型反检测配置[1,4](@ref)"""
     opts = FirefoxOptions()
-    log.info("当前下载路径:", Path(download_dir).resolve())
+    print("当前下载路径:", Path(download_dir).resolve())
     # 文件下载配置
     opts.set_preference("browser.download.dir", download_dir)
     opts.set_preference("browser.download.folderList", 2)
@@ -50,7 +51,7 @@ def configure_stealth_options():
     opts.add_argument("--headless")
     return opts
 
-def find_target_links(driver):
+def find_target_links(driver, year_month):
     """点击列表页链接进入详情页下载文件"""
     WebDriverWait(driver, 30).until(
         EC.presence_of_element_located((By.ID, "conRight"))
@@ -75,7 +76,7 @@ def find_target_links(driver):
 
             try:
                 # 在详情页下载文件
-                download_result = download_file_from_detail_page(driver)
+                download_result = download_file_from_detail_page(driver, year_month)
                 if download_result == 'stop':
                     return 'stop'
                 processed_urls.add(link_url)
@@ -90,7 +91,7 @@ def find_target_links(driver):
     except Exception as e:
         log.info(f"下载时发生异常: {str(e)}")
 
-def download_file_from_detail_page(driver):
+def download_file_from_detail_page(driver, year_month):
     WebDriverWait(driver, 30).until(
         EC.presence_of_element_located((By.ID, "easysiteText"))
     )
@@ -105,8 +106,13 @@ def download_file_from_detail_page(driver):
             file_name = download_btn.text.strip()
             if not file_name:
                 continue
-            if file_name.startswith('2022'):
-                return 'stop'
+            if year_month is None:
+                if file_name.startswith('2022'):
+                    return 'stop'
+            else:
+                if not file_name.startswith(year_month):
+                    log.info(f"非 {year_month} 文件: {file_name}, stop")
+                    return 'stop'
             if '美元' in file_name or '商品贸易方式' in file_name or '进出口总值' in file_name or '月度表' in file_name:
                 log.info(f'{file_name} 不需要此文件,跳过')
                 continue
@@ -162,32 +168,44 @@ def extract_year_and_month(file_name):
     else:
         raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
 
-def extract_rar(rar_path, extract_to):
-    """备用解压函数(当 rarfile 失效时使用)"""
-    winrar_path = r"C:\Program Files\WinRAR\Rar.exe"  # 推荐使用 Rar.exe 而非 WinRAR.exe
-    cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
-
-    # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
-    creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
-
-    result = subprocess.run(
-        cmd,
-        stdout=subprocess.PIPE,
-        stderr=subprocess.PIPE,
-        creationflags=creationflags  # 关键点:隐藏窗口
-    )
-
-    if result.returncode == 0:
-        log.info(f"解压成功: {rar_path} → {extract_to}")
-        return True
-    else:
-        log.info(f"解压失败: {result.stderr.decode('gbk')}")
-        return False
-
-
-def crawl_with_selenium(url):
+def detect_latest_month(driver, url):
+    driver.get(url)
+    current_date = datetime.now()
+    for offset in range(0, 3):
+        check_date = current_date - timedelta(days=offset * 30)
+        check_year = check_date.year
+        check_month = check_date.month
+
+        target_title = f"{check_year}年{check_month}月"
+        try:
+            WebDriverWait(driver, 10).until(
+                EC.presence_of_element_located((By.XPATH, f'//a[contains(@title, "{target_title}")]'))
+            )
+            log.info(f"已找到最新月份数据 {check_year}-{check_month}")
+            # 看是否已存表,已存则跳过;
+            count = base_mysql.get_code_exist(f'{check_year}-{check_month:02d}', "340000")
+            if count > 0:
+                log.info(f"已存在 {check_year}-{check_month} 数据,跳过")
+                continue
+            return f"{check_year}年{check_month}月"
+        except:
+            log.info(f"未找到 {target_title}")
+            continue
+    log.info("三个月内未找到有效数据")
+    return None
+
+def crawl_with_selenium(url, mark):
     driver = webdriver.Firefox(options=configure_stealth_options())
 
+    year_month = None
+    if 'increment' == mark:
+        res = detect_latest_month(driver, url)
+        if res is None:
+            log.info("安徽省海关没有最新数据更新")
+            sys.exit(0)
+        year_month = res
+        print(f"检测到最新有效数据:{year_month}")
+
     try:
         # 注入反检测脚本
         driver.execute_script("""
@@ -202,7 +220,7 @@ def crawl_with_selenium(url):
 
         while True:
             # 访问当前页
-            result = find_target_links(driver)
+            result = find_target_links(driver, year_month)
             if result == 'stop':
                 break
 
@@ -294,7 +312,8 @@ def hierarchical_traversal(root_path):
                 gov_commodity_anhui_city.process_folder(md['path'])
 
 if __name__ == "__main__":
-    crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html')
+    crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html', 'all')
+    # crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html', 'increment')
     print(f"安徽合肥海关全量数据下载任务完成")
     # 等待5s后执行
     time.sleep(5)

+ 4 - 2
anhui/gov_commodity_anhui_city.py

@@ -72,13 +72,15 @@ def process_folder(path):
             yoy_import_export, yoy_import, yoy_export = 0, 0, 0
             sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-                   f"('{year}', '{year_month_2}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+                   f"('{year}', '{year_month_2}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now())"
+                   f"ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr_copy.append(sql)
 
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now())"
+               f"ON DUPLICATE KEY UPDATE create_time = now() ;")
         sql_arr.append(sql)
 
     log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")

+ 3 - 2
anhui/gov_commodity_anhui_country.py

@@ -70,7 +70,8 @@ def process_folder(path):
             yoy_import_export, yoy_import, yoy_export = 0, 0, 0
             sql = (f"INSERT INTO t_yujin_crossborder_prov_country_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, country_code, country_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-                   f"('{year}', '{year_month_2}', '340000', '安徽省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+                   f"('{year}', '{year_month_2}', '340000', '安徽省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now())"
+                   f"ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr_copy.append(sql)
 
         # 构建 SQL
@@ -80,7 +81,7 @@ def process_folder(path):
             f"monthly_total, monthly_export, monthly_import, yoy_import_export, yoy_import, yoy_export, create_time) "
             f"VALUES ('{year}', '{year_month}', '340000', '安徽省', '{country_code}', '{country_name}', "
             f"{format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', "
-            f"'{yoy_export}', NOW());"
+            f"'{yoy_export}', NOW()) ON DUPLICATE KEY UPDATE create_time = now();"
         )
         sql_arr.append(sql)
 

+ 4 - 2
anhui/gov_commodity_anhui_import_export.py

@@ -122,12 +122,14 @@ def save_to_database(import_df, export_df, year, month):
                 monthly_total = round(monthly_import + monthly_export, 4)
                 sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                        f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                       f"('{year}', '{year_month_2}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                       f"('{year}', '{year_month_2}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
+                       f"ON DUPLICATE KEY UPDATE create_time = now() ;")
                 sql_arr_copy.append(sql)
 
             sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                   f"('{year}', '{year_month}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                   f"('{year}', '{year_month}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
+                   f"ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)

+ 1 - 1
hebei/crawl_gov_hebei_full.py

@@ -32,7 +32,7 @@ def get_current_target_titles():
 def configure_stealth_options():
     """增强型反检测配置[1,4](@ref)"""
     opts = FirefoxOptions()
-    log.info("当前下载路径:", Path(download_dir).resolve())
+    print("当前下载路径:", Path(download_dir).resolve())
     # 文件下载配置
     opts.set_preference("browser.download.dir", download_dir)
     opts.set_preference("browser.download.folderList", 2)

+ 2 - 2
hebei/gov_commodity_hebei_city.py

@@ -71,13 +71,13 @@ def process_folder(path):
             monthly_total = round(float(monthly_total) / 2, 4)
             sql_1 = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-                   f"('2023', '2023-01', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+                   f"('2023', '2023-01', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now() ;\n")
             sql_arr_copy.append(sql_1)
 
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now() ;\n")
         sql_arr.append(sql)
 
     log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")

+ 2 - 2
hebei/gov_commodity_hebei_country.py

@@ -68,12 +68,12 @@ def process_folder(path):
             monthly_total = round(float(monthly_total) / 2, 4)
             sql = (f"INSERT INTO t_yujin_crossborder_prov_country_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, country_code, country_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-                   f"('2023', '2023-01', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+                   f"('2023', '2023-01', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now();\n")
             sql_arr_copy.append(sql)
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_country_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, country_code, country_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now();\n")
         sql_arr.append(sql)
 
     log.info(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr)} ")

+ 4 - 2
hebei/gov_commodity_hebei_import_export.py

@@ -90,12 +90,14 @@ def save_to_database(merged_df, year, month):
                 monthly_total = round(monthly_import + monthly_export, 4)
                 sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                        f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                       f"('2023', '2023-01', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                       f"('2023', '2023-01', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
+                       f"ON DUPLICATE KEY UPDATE create_time = now() ;")
                 sql_arr_copy.append(sql)
 
             sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                   f"('{year}', '{year_month}', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                   f"('{year}', '{year_month}', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
+                   f"ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)

+ 6 - 8
jiangsu/crawl_gov_jiangsu_full.py

@@ -20,16 +20,15 @@ import gov_commodity_jiangsu_import_export
 from utils import base_country_code, base_mysql
 from utils.log import log
 
-# 显式指定 unrar 路径(根据实际情况修改)
-rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe"
-# rarfile.UNRAR_EXECUTABLE = "/usr/bin/unrar"  # Linux/macOS
+# rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe"
+rarfile.UNRAR_EXECUTABLE = "unrar"
 download_dir = base_country_code.download_dir
 Path(download_dir).mkdir(parents=True, exist_ok=True)
 
 def configure_stealth_options():
     """增强型反检测配置[1,4](@ref)"""
     opts = FirefoxOptions()
-    log.info("当前下载路径:", Path(download_dir).resolve())
+    print("当前下载路径:", Path(download_dir).resolve())
     # 文件下载配置
     opts.set_preference("browser.download.dir", download_dir)
     opts.set_preference("browser.download.folderList", 2)
@@ -155,8 +154,9 @@ def find_target_links(driver):
 
 def extract_rar(rar_path, extract_to):
     """备用解压函数(当 rarfile 失效时使用)"""
-    winrar_path = r"C:\Program Files\WinRAR\Rar.exe"  # 推荐使用 Rar.exe 而非 WinRAR.exe
-    cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
+    # winrar_path = r"C:\Program Files\WinRAR\Rar.exe"  # 推荐使用 Rar.exe 而非 WinRAR.exe
+    # cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
+    cmd = ["unrar", 'x', '-y', rar_path, str(extract_to)]
 
     # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
     creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
@@ -228,13 +228,11 @@ def crawl_with_selenium(url):
 
 def wait_for_download_complete(timeout=30, existing_files=None):
     start_time = time.time()
-    temp_exts = ('.part', '.crdownload')
 
     if existing_files is None:
         existing_files = set(f.name for f in Path(download_dir).glob('*'))
 
     while (time.time() - start_time) < timeout:
-        current_files = set(f.name for f in Path(download_dir).glob('*'))
         new_files = [f for f in Path(download_dir).glob('*.rar') if f.name not in existing_files]
         if new_files:
             # 等待文件大小稳定(不再变化),确保下载完成

+ 6 - 8
jiangsu/gov_commodity_jiangsu_city.py

@@ -25,7 +25,7 @@ city_code_map = {
 
 ignore_city_code_arr = ['江阴市','宜兴市','常熟市','张家港市','昆山市','吴江市','太仓市','启东市','东台市','仪征市','丹阳市','兴化市']
 
-def get_df(path):
+def get_df(path, year_month):
     global df,  df_type
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
@@ -36,11 +36,8 @@ def get_df(path):
         log.info(f"处理单文件: {file_path.name}")
         xls = pd.ExcelFile(file_path)
 
-        sheet_name = base_country_code.find_sheet_by_keyword(file_path, "地")
-        if not sheet_name:
-            log.info(f"{file_path} 未找到包含 地市 sheet")
-            return None
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[5:]
+        sheet_index = 5 if year_month == '2024-11' else 3
+        df = pd.read_excel(xls, sheet_name=sheet_index, header=None).iloc[5:]
         df_type = 0
 
     else:
@@ -58,7 +55,7 @@ def process_folder(path):
     year_month = f'{year}-{month:02d}'
 
     sql_arr = []
-    res = get_df(path)
+    res = get_df(path, year_month)
     if res is None:
         log.info(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
         return
@@ -93,7 +90,8 @@ def process_folder(path):
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '320000', '江苏省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '320000', '江苏省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now())"
+               f"ON DUPLICATE KEY UPDATE create_time = now(); \n")
         sql_arr.append(sql)
 
     log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")

+ 5 - 8
jiangsu/gov_commodity_jiangsu_country.py

@@ -11,7 +11,7 @@ EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "
                    "东南亚国家联盟", "欧洲联盟", "亚太经济合作组织",
                    "区域全面经济伙伴关系协定(RCEP)成员国", "共建“一带一路”国家和地区"]
 
-def get_df(path):
+def get_df(path, year_month):
     global df,  df_type
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
@@ -22,11 +22,7 @@ def get_df(path):
         log.info(f"处理单文件: {file_path.name}")
         xls = pd.ExcelFile(file_path)
 
-        sheet_name = base_country_code.find_sheet_by_keyword(file_path, "国别")
-        if not sheet_name:
-            log.info(f"{file_path} 未找到包含 类章 sheet")
-            return None
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[5:]
+        df = pd.read_excel(xls, sheet_name=1, header=None).iloc[5:]
         df_type = 0
 
     else:
@@ -45,7 +41,7 @@ def process_folder(path):
 
     sql_arr = []
     try:
-        df, df_type = get_df(path)
+        df, df_type = get_df(path, year_month)
         if df_type == 0:
             country_name_index = 0
             col_total_index, col_monthly_export_index, col_monthly_import_index = 1, 3, 5
@@ -85,7 +81,8 @@ def process_folder(path):
                 f"monthly_total, monthly_export, monthly_import, yoy_import_export, yoy_import, yoy_export, create_time) "
                 f"VALUES ('{year}', '{year_month}', '320000', '江苏省', '{country_code}', '{country_name}', "
                 f"{format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', "
-                f"'{yoy_export}', NOW());"
+                f"'{yoy_export}', NOW())"
+                f"ON DUPLICATE KEY UPDATE create_time = now() ;"
             )
             sql_arr.append(sql)
     except Exception as e:

+ 4 - 7
jiangsu/gov_commodity_jiangsu_import_export.py

@@ -4,7 +4,6 @@ from pathlib import Path
 import pandas as pd
 
 from utils import base_country_code, base_mysql
-from utils.base_country_code import format_sql_value
 from utils.log import log
 
 YEAR_PATTERN = re.compile(r"^\d{4}$")
@@ -28,12 +27,9 @@ def process_folder(path, all_records):
         import_df = pd.DataFrame()
         export_df = pd.DataFrame()
         total_df = pd.DataFrame()
-        sheet_name = base_country_code.find_sheet_by_keyword(file_path, "类章")
-        if not sheet_name:
-            log.info(f"{file_path} 未找到包含 类章 sheet")
-            return
         skip_index = 4 if year_month == '2024-11' else 5
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[skip_index:]
+        sheet_index = 6 if year_month == '2024-11' else 4
+        df = pd.read_excel(xls, sheet_name=sheet_index, header=None).iloc[skip_index:]
         temp_df = df[[0, 5]].rename(columns={0: 'commodity', 5: 'import'})
         temp_df['import'] = pd.to_numeric(temp_df['import'].replace('--', 0), errors='coerce')
         temp_df['import'] = temp_df['import'] * 10000
@@ -115,7 +111,8 @@ def save_to_database(import_df, export_df, total_df, year, month, all_records):
 
         sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time, commodity_source) VALUES "
-               f"('{year}', '{year_month}', '320000', '江苏省', '{commodity_code}', '{category_name}', {monthly_total}, {monthly_export}, {monthly_import}, now(), 1);")
+               f"('{year}', '{year_month}', '320000', '江苏省', '{commodity_code}', '{category_name}', {monthly_total}, {monthly_export}, {monthly_import}, now(), 1)"
+               f"ON DUPLICATE KEY UPDATE create_time = now() ;")
         sql_arr.append(sql)
 
         processed_commodities.add(commodity_code)

+ 0 - 27
utils/base_country_code.py

@@ -3,7 +3,6 @@ import re
 from pathlib import Path
 
 import pandas as pd
-from openpyxl import load_workbook
 
 from utils.log import log
 
@@ -24,32 +23,6 @@ def format_sql_value(value):
     else:
         return f"'{value}'"
 
-def find_sheet_by_keyword(file_path, keyword):
-    """
-    模糊查找包含关键字的 sheet 名称(支持 .xls 和 .xlsx)
-
-    :param file_path: Excel 文件路径
-    :param keyword: 要匹配的关键字(如 '类章')
-    :return: 匹配到的第一个 sheet 名称,或 None
-    """
-    # 处理 .xlsx 文件
-    if file_path.suffix == ".xlsx":
-        workbook = load_workbook(filename=file_path, read_only=True)
-        sheets = workbook.sheetnames
-    # 处理 .xls 文件
-    elif file_path.suffix == ".xls":
-        import xlrd
-        workbook = xlrd.open_workbook(file_path)
-        sheets = workbook.sheet_names()
-    else:
-        raise ValueError(f"不支持的文件格式:{file_path.suffix}")
-
-    # 精确匹配 + 模糊匹配策略
-    for sheet in sheets:
-        if keyword.lower() in sheet.lower():
-            return sheet
-    return None
-
 def get_previous_month_dir(current_path):
     """生成前月目录路径"""
     try:

+ 27 - 3
utils/base_mysql.py

@@ -80,6 +80,26 @@ def get_hs_all():
         if connection:
             connection.close()
 
+def get_code_exist(crossborder_year_month, prov_code):
+    try:
+        # 使用 with 自动管理连接生命周期
+        with pymysql.connect(**DB_CONFIG) as connection:
+            with connection.cursor() as cursor:
+                # 执行查询
+                sql = """
+                    SELECT COUNT(1) 
+                    FROM t_yujin_crossborder_prov_commodity_trade e 
+                    WHERE e.crossborder_year_month = %s 
+                      AND e.prov_code = %s
+                """
+                cursor.execute(sql, (crossborder_year_month, prov_code))
+                result = cursor.fetchone()
+                return int(result[0]) if result and result[0] else 0
+    except Exception as e:
+        log.info(f"[数据库查询异常] 查询条件: {crossborder_year_month}, {prov_code} | 错误详情: {str(e)}")
+        return 0
+
+
 # 对密码进行 URL 编码
 encoded_password = quote_plus(DB_CONFIG["password"])
 
@@ -239,6 +259,10 @@ def _update_shandong_new_yoy(prov_name):
 
 
 if __name__ == '__main__':
-    update_january_yoy('浙江省')
-    update_shandong_yoy('浙江省')
-    log.info("同比sql处理完成")
+    check_year, check_month = 2024, 4
+    count = get_code_exist(f'{check_year}-{check_month:02d}', "340000")
+    print(count)
+
+    # update_january_yoy('浙江省')
+    # update_shandong_yoy('浙江省')
+    # log.info("同比sql处理完成")

+ 1 - 25
zhejiang/crawl_gov_zhejiangi_full.py

@@ -1,7 +1,6 @@
 import os
 import random
 import re
-import subprocess
 import time
 from pathlib import Path
 from urllib.parse import urljoin
@@ -26,7 +25,7 @@ Path(download_dir).mkdir(parents=True, exist_ok=True)
 def configure_stealth_options():
     """增强型反检测配置[1,4](@ref)"""
     opts = FirefoxOptions()
-    log.info("当前下载路径:", Path(download_dir).resolve())
+    print("当前下载路径:", Path(download_dir).resolve())
     # 文件下载配置
     opts.set_preference("browser.download.dir", download_dir)
     opts.set_preference("browser.download.folderList", 2)
@@ -235,29 +234,6 @@ def extract_year_and_month(file_name):
     else:
         raise ValueError(f"无法从文件名中提取年份和月份:{file_name}")
 
-def extract_rar(rar_path, extract_to):
-    """备用解压函数(当 rarfile 失效时使用)"""
-    winrar_path = r"C:\Program Files\WinRAR\Rar.exe"  # 推荐使用 Rar.exe 而非 WinRAR.exe
-    cmd = [winrar_path, 'x', '-y', rar_path, str(extract_to)]
-
-    # 使用 CREATE_NO_WINDOW 防止弹出命令行窗口
-    creationflags = subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
-
-    result = subprocess.run(
-        cmd,
-        stdout=subprocess.PIPE,
-        stderr=subprocess.PIPE,
-        creationflags=creationflags  # 关键点:隐藏窗口
-    )
-
-    if result.returncode == 0:
-        log.info(f"解压成功: {rar_path} → {extract_to}")
-        return True
-    else:
-        log.info(f"解压失败: {result.stderr.decode('gbk')}")
-        return False
-
-
 def crawl_with_selenium(url):
     driver = webdriver.Firefox(options=configure_stealth_options())
     base_url = 'http://hangzhou.customs.gov.cn'

+ 4 - 12
zhejiang/gov_commodity_zhejiang_city.py

@@ -26,25 +26,17 @@ def get_df(path, year_month):
     if not file_paths:
         log.info("未找到任何文件")
         return None
-    file_path = file_paths[0]
-    sheet_name = base_country_code.find_sheet_by_keyword(file_path, "十一地市")
-
     flag = True
-    if not sheet_name:
-        log.info(f"{file_path} 未找到包含 十一地市 sheet")
-        # 23年1-11月数据要在多文件里找
+    file_path = file_paths[0]
+    if len(file_paths) > 1:
         for file_path in file_paths:
             if '十一地市' in file_path.name:
                 file_path = file_path
                 flag = False
                 break
-
-    if not sheet_name and flag:
-        log.info(f"{path} 未找到包含 十一地市 sheet或文件")
-        return None
     if flag:
         xls = pd.ExcelFile(file_path)
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None)
+        df = pd.read_excel(xls, sheet_name=0, header=None)
     else:
         df = pd.read_excel(file_path, header=None)
 
@@ -135,7 +127,7 @@ def process_folder(path):
         # 组装 SQL 语句
         sql = (f"INSERT INTO t_yujin_crossborder_prov_region_trade "
                f"(crossborder_year, crossborder_year_month, prov_code, prov_name, city_code, city_name, monthly_total, monthly_export, monthly_import,yoy_import_export, yoy_import, yoy_export, create_time) VALUES "
-               f"('{year}', '{year_month}', '330000', '浙江省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
+               f"('{year}', '{year_month}', '330000', '浙江省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now()) ON DUPLICATE KEY UPDATE create_time = now();\n")
         sql_arr.append(sql)
 
     log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")

+ 9 - 17
zhejiang/gov_commodity_zhejiang_country.py

@@ -26,23 +26,15 @@ def get_df_country(path, year_month):
     total_df = pd.DataFrame()
 
     flag = True
-    sheet_name = base_country_code.find_sheet_by_keyword(file_path, "国别")
-    if not sheet_name:
-        log.info(f"{file_path} 未找到包含 国别 sheet")
-        sheet_name = base_country_code.find_sheet_by_keyword(file_path, "组织")
-        if not sheet_name:
-            log.info(f"{file_path} 未找到包含 组织 sheet")
-            # 23年1-11月数据要在多文件里找
-            for file_path in file_paths:
-                if '洲贸组织' in file_path.name:
-                    file_path = file_path
-                    flag = False
-                    break
-    if not sheet_name and flag:
-        log.info(f"{path} 未找到包含 国别 | 组织 | 洲贸组织 sheet或文件")
-        return None
+    file_path = file_paths[0]
+    if len(file_paths) > 1:
+        for file_path in file_paths:
+            if '洲贸组织' in file_path.name:
+                file_path = file_path
+                flag = False
+                break
     if flag:
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None)
+        df = pd.read_excel(xls, sheet_name=1, header=None)
     else:
         df = pd.read_excel(file_path, header=None)
     temp_df = df[[0, 1]].rename(columns={0: 'commodity', 1: 'total'})
@@ -140,7 +132,7 @@ def process_folder(path):
             f"monthly_total, monthly_export, monthly_import, yoy_import_export, yoy_import, yoy_export, create_time) "
             f"VALUES ('{year}', '{year_month}', '330000', '浙江省', '{country_code}', '{country_name}', "
             f"{format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', "
-            f"'{yoy_export}', NOW());"
+            f"'{yoy_export}', NOW()) ON DUPLICATE KEY UPDATE create_time = now();"
         )
         sql_arr.append(sql)
     # except Exception as e:

+ 12 - 24
zhejiang/gov_commodity_zhejiang_import_export.py

@@ -46,29 +46,23 @@ def get_df_import_export(path, year_month):
     if not file_paths:
         log.info("未找到任何文件")
         return None
-    file_path = file_paths[0]
-    log.info(f"处理文件: {file_path.name}")
-
-    xls = pd.ExcelFile(file_path)
-    import_df = pd.DataFrame()
-    export_df = pd.DataFrame()
 
     flag = True
-    sheet_name = base_country_code.find_sheet_by_keyword(file_path, "主出商品")
-    if not sheet_name:
-        log.info(f"{file_path} 单文件未找到包含 主出商品 sheet")
-        # 23年1-11月数据要在多文件里找
+    file_path = file_paths[0]
+    if len(file_paths) > 1:
         for file_path in file_paths:
             if '主要出口商品' in file_path.name:
                 file_path = file_path
                 flag = False
                 break
-    if not sheet_name and flag:
-        log.info(f"{path} 中未找到 主出商品 sheet或文件")
-        return None
+    log.info(f"处理文件: {file_path.name}")
+
+    xls = pd.ExcelFile(file_path)
+    import_df = pd.DataFrame()
+    export_df = pd.DataFrame()
 
     if flag:
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[2:]
+        df = pd.read_excel(xls, sheet_name=4, header=None).iloc[2:]
     else:
         df = pd.read_excel(file_path, header=None).iloc[1:]
     temp_df = df[[0, 1]].rename(columns={0: 'commodity', 1: 'export'})
@@ -84,21 +78,15 @@ def get_df_import_export(path, year_month):
     export_df = pd.concat([export_df, temp_df])
 
     flag_2 = True
-    sheet_name = base_country_code.find_sheet_by_keyword(file_path, "主进商品")
-    if not sheet_name:
-        log.info(f"{file_path} 单文件未找到包含 主进商品 sheet")
-        # 23年1-11月数据要在多文件里找
+
+    if len(file_paths) > 1:
         for file_path in file_paths:
             if '主要进口商品' in file_path.name:
                 file_path = file_path
                 flag_2 = False
                 break
-    if not sheet_name and flag_2:
-        log.info(f"{path} 中未找到 主进商品 sheet或文件")
-        return None
-
     if flag_2:
-        df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[2:]
+        df = pd.read_excel(xls, sheet_name=5, header=None).iloc[2:]
     else:
         df = pd.read_excel(file_path, header=None).iloc[1:]
     temp_df = df[[0, 1]].rename(columns={0: 'commodity', 1: 'import'})
@@ -191,7 +179,7 @@ def save_to_database(merged_df, year, month):
 
             sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                    f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
-                   f"('{year}', '{year_month}', '330000', '浙江省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now());")
+                   f"('{year}', '{year_month}', '330000', '浙江省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now()) ON DUPLICATE KEY UPDATE create_time = now() ;")
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)