소스 검색

crawl add log

zhangfan 1 주 전
부모
커밋
dc30aab348

+ 17 - 16
anhui/crawl_gov_anhui_full.py

@@ -16,6 +16,7 @@ import gov_commodity_anhui_city
 import gov_commodity_anhui_country
 import gov_commodity_anhui_import_export
 from utils import base_country_code, base_mysql
+from utils.log import log
 
 download_dir = base_country_code.download_dir
 Path(download_dir).mkdir(parents=True, exist_ok=True)
@@ -23,7 +24,7 @@ Path(download_dir).mkdir(parents=True, exist_ok=True)
 def configure_stealth_options():
     """增强型反检测配置[1,4](@ref)"""
     opts = FirefoxOptions()
-    print("当前下载路径:", Path(download_dir).resolve())
+    log.info("当前下载路径:", Path(download_dir).resolve())
     # 文件下载配置
     opts.set_preference("browser.download.dir", download_dir)
     opts.set_preference("browser.download.folderList", 2)
@@ -70,7 +71,7 @@ def find_target_links(driver):
             # 新标签页打开链接
             driver.execute_script("window.open(arguments[0]);", link_url)
             driver.switch_to.window(driver.window_handles[-1])
-            print(f"正在处理详情页: {link_url}")
+            log.info(f"正在处理详情页: {link_url}")
 
             try:
                 # 在详情页下载文件
@@ -87,7 +88,7 @@ def find_target_links(driver):
 
         return None
     except Exception as e:
-        print(f"下载时发生异常: {str(e)}")
+        log.info(f"下载时发生异常: {str(e)}")
 
 def download_file_from_detail_page(driver):
     WebDriverWait(driver, 30).until(
@@ -97,7 +98,7 @@ def download_file_from_detail_page(driver):
     try:
         elements = driver.find_elements(By.XPATH, '//div[@id="easysiteText"]//a')
         if not elements:
-            print("详情页未找到目标文件链接")
+            log.info("详情页未找到目标文件链接")
             return None
 
         for download_btn in elements:
@@ -107,7 +108,7 @@ def download_file_from_detail_page(driver):
             if file_name.startswith('2022'):
                 return 'stop'
             if '美元' in file_name or '商品贸易方式' in file_name or '进出口总值' in file_name or '月度表' in file_name:
-                print(f'{file_name} 不需要此文件,跳过')
+                log.info(f'{file_name} 不需要此文件,跳过')
                 continue
 
             file_url = download_btn.get_attribute("href")
@@ -116,10 +117,10 @@ def download_file_from_detail_page(driver):
                 file_url = base_url + file_url
 
             if not file_url.lower().endswith(('.xls', '.xlsx')):
-                print(f"跳过非 Excel 文件: {file_url}")
+                log.info(f"跳过非 Excel 文件: {file_url}")
                 continue
 
-            print(f"正在下载: {file_name} → {file_url}")
+            log.info(f"正在下载: {file_name} → {file_url}")
 
             # 记录下载前的文件列表
             existing_files = set(f.name for f in Path(download_dir).glob('*'))
@@ -132,18 +133,18 @@ def download_file_from_detail_page(driver):
             year, start_month, month = extract_year_and_month(file_name)
             final_path = Path(download_dir) / year / month / f"{file_name}"
             if os.path.exists(final_path):
-                print(f"文件已存在:{file_name} 正在覆盖...")
+                log.info(f"文件已存在:{file_name} 正在覆盖...")
                 os.unlink(final_path)
 
             final_dir = Path(download_dir) / year / month
             final_dir.mkdir(parents=True, exist_ok=True)
-            print(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
+            log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
             downloaded_file.rename(final_path)
-            print(f"√ 下载成功:{final_path} \n")
+            log.info(f"√ 下载成功:{final_path} \n")
 
         return None
     except Exception as e:
-        print(f"详情页处理异常: {str(e)}")
+        log.info(f"详情页处理异常: {str(e)}")
         return None
 
 def extract_year_and_month(file_name):
@@ -177,10 +178,10 @@ def extract_rar(rar_path, extract_to):
     )
 
     if result.returncode == 0:
-        print(f"解压成功: {rar_path} → {extract_to}")
+        log.info(f"解压成功: {rar_path} → {extract_to}")
         return True
     else:
-        print(f"解压失败: {result.stderr.decode('gbk')}")
+        log.info(f"解压失败: {result.stderr.decode('gbk')}")
         return False
 
 
@@ -217,7 +218,7 @@ def crawl_with_selenium(url):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                print("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止爬取")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -228,7 +229,7 @@ def crawl_with_selenium(url):
             # 访问下一页
             driver.get(next_page_url)
 
-            print(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始爬取 {next_page_url} 页面数据")
 
     finally:
         driver.quit()
@@ -274,7 +275,7 @@ def hierarchical_traversal(root_path):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        print(f"\n年份:{year_dir.name} | 省份:anhui")
 
         # 提取月份目录
         month_dirs = []

+ 11 - 10
anhui/gov_commodity_anhui_city.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 city_code_map = {
     "安徽省合肥市": "3401",
@@ -29,11 +30,11 @@ def get_df(path):
     global df,  df_type
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     for file in file_paths:
         if "收发货人" in file.name:
-            print(f"处理多文件: {file.name}")
+            log.info(f"处理多文件: {file.name}")
             file_path = Path(path) / file
             df = pd.read_excel(file_path, header=None).iloc[5:]
             break
@@ -47,7 +48,7 @@ def process_folder(path):
     sql_arr = []
     res = get_df(path)
     if res is None:
-        print(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
+        log.info(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
         return
     df = res
     country_name_index = 0
@@ -57,7 +58,7 @@ def process_folder(path):
         city_name = str(row.values[country_name_index]).strip()
         city_code = city_code_map.get(city_name)
         if not city_code:
-            print(f"未找到省 '{city_name}' 对应市编码")
+            log.info(f"未找到省 '{city_name}' 对应市编码")
             continue
 
         monthly_export, monthly_import, monthly_total, yoy_export, yoy_import, yoy_import_export = value_row(row, month)
@@ -80,12 +81,12 @@ def process_folder(path):
                f"('{year}', '{year_month}', '340000', '安徽省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     base_mysql.bulk_insert(sql_arr)
     if month == 2:
-        print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_region_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_region_trade SQL 存表完成!")
 
 
 def value_row(row, month):
@@ -110,7 +111,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:anhui")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -119,9 +120,9 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"安徽合肥海关城市所有文件处理完成!")
+    log.info(f"安徽合肥海关城市所有文件处理完成!")

+ 11 - 10
anhui/gov_commodity_anhui_country.py

@@ -4,6 +4,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 # 排除地区名单
 EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "大洋洲", "南极洲",
@@ -13,11 +14,11 @@ EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "
 def get_df(path):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     for file in file_paths:
         if "国别" in file.name:
-            print(f"处理多文件: {file.name}")
+            log.info(f"处理多文件: {file.name}")
             file_path = Path(path) / file
             return pd.read_excel(file_path, header=None).iloc[6:]
     return None
@@ -31,7 +32,7 @@ def process_folder(path):
     # try:
     df = get_df(path)
     if df is None:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
 
     country_name_index = 0
@@ -53,7 +54,7 @@ def process_folder(path):
         # 获取国家编码
         country_code = base_country_code.COUNTRY_CODE_MAPPING.get(country_name)
         if not country_code:
-            print(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
+            log.info(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
             continue
 
         # 提取数据并格式化
@@ -83,13 +84,13 @@ def process_folder(path):
         )
         sql_arr.append(sql)
 
-    print(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
+    log.info(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
     # 批量插入数据库
     base_mysql.bulk_insert(sql_arr)
     if month == 2:
-        print(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_country_trade SQL 存表完成!\n")
+    log.info(f"√ {year_month} prov_country_trade SQL 存表完成!\n")
 
 
 
@@ -119,7 +120,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:anhui")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -128,10 +129,10 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print("安徽合肥海关国别所有文件处理完成!")
+    log.info("安徽合肥海关国别所有文件处理完成!")

+ 12 - 11
anhui/gov_commodity_anhui_import_export.py

@@ -6,6 +6,7 @@ import pandas as pd
 from utils import base_country_code, base_mysql
 
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 CUSTOM_COMMODITY_REPLACEMENTS = {
     '家具': '家具及其零件',
@@ -43,7 +44,7 @@ def clean_commodity_name(name, preserve_keywords=None):
 def process_folder(path):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     year, month = base_country_code.extract_year_month_from_path(path)
 
@@ -97,11 +98,11 @@ def save_to_database(import_df, export_df, year, month):
         for _, row in merged_df.iterrows():
             commodity_name = str(row['commodity'])
             if commodity_name == '肉类' or commodity_name == '其他' or commodity_name == '干鲜瓜果' or commodity_name == '钟表':
-                print(f'{commodity_name} 商品不存在,跳过')
+                log.info(f'{commodity_name} 商品不存在,跳过')
                 continue
             commodity_code, commodity_name_fix = base_mysql.get_commodity_id(commodity_name)
             if not commodity_code:
-                print(f"未找到商品名称 '{commodity_name}' 对应的 ID")
+                log.info(f"未找到商品名称 '{commodity_name}' 对应的 ID")
                 continue
             if not commodity_name_fix or commodity_name_fix in processed_commodities:
                 continue
@@ -130,18 +131,18 @@ def save_to_database(import_df, export_df, year, month):
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)
-            # print(f'{commodity_name} -> {commodity_name_fix}')
+            # log.info(f'{commodity_name} -> {commodity_name_fix}')
 
     except Exception as e:
-        print(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
+        log.info(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
 
-    print(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
     if month == 2:
-        print(f"√ {year_month} prov_commodity_trade copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_commodity_trade copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_commodity_trade SQL 存表完成!\n")
+    log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!\n")
 
 def hierarchical_traversal(root_path):
     """分层遍历:省份->年份->月目录"""
@@ -155,7 +156,7 @@ def hierarchical_traversal(root_path):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:anhui")
 
         # 提取月份目录
         month_dirs = []
@@ -168,7 +169,7 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
@@ -176,4 +177,4 @@ if __name__ == '__main__':
 
     # root = Path(base_country_code.download_dir)/'2025'/'04'
     # process_folder(root)
-    print("安徽合肥海关类章所有文件处理完成!")
+    log.info("安徽合肥海关类章所有文件处理完成!")

+ 16 - 15
hebei/crawl_gov_hebei_full.py

@@ -15,6 +15,7 @@ from utils import base_country_code, base_mysql
 import gov_commodity_hebei_import_export
 import gov_commodity_hebei_country
 import gov_commodity_hebei_city
+from utils.log import log
 
 download_dir = base_country_code.download_dir
 Path(download_dir).mkdir(parents=True, exist_ok=True)
@@ -31,7 +32,7 @@ def get_current_target_titles():
 def configure_stealth_options():
     """增强型反检测配置[1,4](@ref)"""
     opts = FirefoxOptions()
-    print("当前下载路径:", Path(download_dir).resolve())
+    log.info("当前下载路径:", Path(download_dir).resolve())
     # 文件下载配置
     opts.set_preference("browser.download.dir", download_dir)
     opts.set_preference("browser.download.folderList", 2)
@@ -82,7 +83,7 @@ def find_target_links(driver):
 
     element_arr = driver.find_elements(By.XPATH, '//div[@class="list_con"]//ul[@class="easysite-list-modelone"]//a')
     if not element_arr:
-        print("未找到目标标题")
+        log.info("未找到目标标题")
         return None
     for elements in element_arr:
         file_name = elements.text.strip()
@@ -95,9 +96,9 @@ def find_target_links(driver):
             file_url = remove_prefix_from_url(file_url)
 
             if not file_url.lower().endswith(('.xls', '.xlsx')):
-                print(f"跳过非 Excel 文件: {file_url}")
+                log.info(f"跳过非 Excel 文件: {file_url}")
                 continue
-            print(f"正在下载: {file_name} → {file_url}")
+            log.info(f"正在下载: {file_name} → {file_url}")
             # 记录下载前的文件列表
             existing_files = set(f.name for f in Path(download_dir).glob('*'))
             # 随机点击延迟
@@ -107,21 +108,21 @@ def find_target_links(driver):
             try:
                 downloaded_file = wait_for_download_complete(existing_files=existing_files)
             except Exception as e:
-                print(f"下载失败: {str(e)}")
+                log.info(f"下载失败: {str(e)}")
                 continue
             year, start_month, month = extract_year_and_month(file_name)
             final_path = Path(download_dir) / year / month / f"{file_name}.xls"
             if os.path.exists(final_path):
-                print(f"文件已存在:{file_name} 正在覆盖...")
+                log.info(f"文件已存在:{file_name} 正在覆盖...")
                 os.unlink(final_path)
 
             final_dir = Path(download_dir) / year / month
             final_dir.mkdir(parents=True, exist_ok=True)
-            print(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
+            log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
             downloaded_file.rename(final_path)
-            print(f"√ 下载成功:{final_path}")
+            log.info(f"√ 下载成功:{final_path}")
         else:
-            print(f'{file_name} 不需要此文件,跳过')
+            log.info(f'{file_name} 不需要此文件,跳过')
             continue
     return None
 
@@ -170,7 +171,7 @@ def crawl_with_selenium(url):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                print("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止爬取")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -181,7 +182,7 @@ def crawl_with_selenium(url):
             # 访问下一页
             driver.get(next_page_url)
 
-            print(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始爬取 {next_page_url} 页面数据")
     finally:
         driver.quit()
 
@@ -227,7 +228,7 @@ def hierarchical_traversal(root_path):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:hebei")
 
         # 提取月份目录
         month_dirs = []
@@ -240,7 +241,7 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 gov_commodity_hebei_import_export.process_folder(md['path'])
                 gov_commodity_hebei_country.process_folder(md['path'])
                 gov_commodity_hebei_city.process_folder(md['path'])
@@ -251,8 +252,8 @@ if __name__ == "__main__":
     # 等待5s后执行
     time.sleep(5)
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"河北石家庄海关全量数据下载任务完成")
+    log.info(f"河北石家庄海关全量数据下载任务完成")
     time.sleep(5)
     base_mysql.update_january_yoy('河北省')
     base_mysql.update_shandong_yoy('河北省')
-    print("河北石家庄海关城市同比sql处理完成")
+    log.info("河北石家庄海关城市同比sql处理完成")

+ 11 - 10
hebei/gov_commodity_hebei_city.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 city_code_map = {
     "石家庄市": "130100",
@@ -23,11 +24,11 @@ city_code_map = {
 def get_df(path):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     for file in file_paths:
         if "地市" in file.name:
-            print(f"处理多文件: {file.name}")
+            log.info(f"处理多文件: {file.name}")
             file_path = Path(path) / file
             return pd.read_excel(file_path, header=None).iloc[5:]
     return None
@@ -38,7 +39,7 @@ def process_folder(path):
 
     df = get_df(path)
     if df is None:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
 
     if year == 2025 and month >= 3:
@@ -58,7 +59,7 @@ def process_folder(path):
 
         city_code = city_code_map.get(city_name)
         if not city_code:
-            print(f"未找到省 '{city_name}' 对应市编码")
+            log.info(f"未找到省 '{city_name}' 对应市编码")
             continue
 
         monthly_export, monthly_import, monthly_total = value_row(row, col_total_index, col_monthly_export_index, col_monthly_import_index)
@@ -79,13 +80,13 @@ def process_folder(path):
                f"('{year}', '{year_month}', '130000', '河北省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
     if year_month == '2023-02':
-        print(f"√ {year_month} sql_arr_copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} sql_arr_copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_region_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_region_trade SQL 存表完成!")
 
 
 def value_row(row, col_total_index, col_monthly_export_index, col_monthly_import_index):
@@ -108,7 +109,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:hebei")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -117,9 +118,9 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"河北石家庄海关城市所有文件处理完成!")
+    log.info(f"河北石家庄海关城市所有文件处理完成!")

+ 11 - 10
hebei/gov_commodity_hebei_country.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "大洋洲", "南极洲",
                    "东南亚国家联盟", "欧洲联盟", "亚太经济合作组织",
@@ -13,11 +14,11 @@ EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "
 def get_df(path):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     for file in file_paths:
         if "国" in file.name:
-            print(f"处理多文件: {file.name}")
+            log.info(f"处理多文件: {file.name}")
             file_path = Path(path) / file
             return pd.read_excel(file_path, header=None).iloc[6:]
     return None
@@ -28,7 +29,7 @@ def process_folder(path):
 
     df = get_df(path)
     if df is None:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
 
     if year == 2025 and month >= 3:
@@ -55,7 +56,7 @@ def process_folder(path):
 
         country_code = base_country_code.COUNTRY_CODE_MAPPING.get(country_name)
         if not country_code:
-            print(f"{year_month} 未找到国家 '{country_name}' 对应国家的编码")
+            log.info(f"{year_month} 未找到国家 '{country_name}' 对应国家的编码")
             continue
 
         monthly_export, monthly_import, monthly_total = value_row(row, col_total_index, col_monthly_export_index, col_monthly_import_index)
@@ -75,13 +76,13 @@ def process_folder(path):
                f"('{year}', '{year_month}', '130000', '河北省', '{country_code}', '{country_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
     if year_month == '2023-02':
-        print(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_country_trade 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_country_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_country_trade SQL 存表完成!")
 
 def value_row(row,col_total_index, col_monthly_export_index, col_monthly_import_index):
     monthly_total = str(row.values[col_total_index]).strip()
@@ -103,7 +104,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:hebei")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -112,9 +113,9 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"河北石家庄海关国家的所有文件处理完成!")
+    log.info(f"河北石家庄海关国家的所有文件处理完成!")

+ 11 - 10
hebei/gov_commodity_hebei_import_export.py

@@ -2,6 +2,7 @@ from pathlib import Path
 
 import pandas as pd
 import re
+from utils.log import log
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
@@ -47,7 +48,7 @@ def process_folder(path):
     value_index = 5 if year == 2025 and month >= 3 else 4
     res = df_data(path, name_index, value_index)
     if not res:
-        print(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
+        log.info(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
         return
     export_df, import_df = res
 
@@ -70,7 +71,7 @@ def save_to_database(merged_df, year, month):
             commodity_name = str(row['commodity']).strip()
             commodity_code,commodity_name_fix = base_mysql.get_commodity_id(commodity_name)
             if not commodity_code:
-                print(f"未找到商品名称 '{commodity_name}' 对应的 ID")
+                log.info(f"未找到商品名称 '{commodity_name}' 对应的 ID")
                 continue
             if not commodity_name_fix or commodity_name_fix in processed_commodities:
                 continue
@@ -100,21 +101,21 @@ def save_to_database(merged_df, year, month):
             processed_commodities.add(commodity_name_fix)
 
     except Exception as e:
-        print(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
+        log.info(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
 
-    print(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
     if year_month == '2023-02':
-        print(f"√ {year_month} prov_commodity_trade copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
+        log.info(f"√ {year_month} prov_commodity_trade copy 成功生成 SQL 文件 size {len(sql_arr_copy)} ")
         base_mysql.bulk_insert(sql_arr_copy)
-    print(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
 
 
 def df_data(path, name_index, value_index):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     import_df = pd.DataFrame()
     export_df = pd.DataFrame()
@@ -158,7 +159,7 @@ def hierarchical_traversal(root_path):
 
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:hebei")
 
         # 提取月份目录
         month_dirs = []
@@ -171,7 +172,7 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
@@ -179,4 +180,4 @@ if __name__ == '__main__':
 
     # root = Path(base_country_code.download_dir)/'2023'/'02'
     # process_folder(root)
-    print(f"河北石家庄海关出入口商品所有文件处理完成!")
+    log.info(f"河北石家庄海关出入口商品所有文件处理完成!")

+ 24 - 23
jiangsu/crawl_gov_jiangsu_full.py

@@ -18,6 +18,7 @@ import gov_commodity_jiangsu_city
 import gov_commodity_jiangsu_import_export
 
 from utils import base_country_code, base_mysql
+from utils.log import log
 
 # 显式指定 unrar 路径(根据实际情况修改)
 rarfile.UNRAR_EXECUTABLE = r"C:\Program Files\WinRAR\UnRAR.exe"
@@ -28,7 +29,7 @@ Path(download_dir).mkdir(parents=True, exist_ok=True)
 def configure_stealth_options():
     """增强型反检测配置[1,4](@ref)"""
     opts = FirefoxOptions()
-    print("当前下载路径:", Path(download_dir).resolve())
+    log.info("当前下载路径:", Path(download_dir).resolve())
     # 文件下载配置
     opts.set_preference("browser.download.dir", download_dir)
     opts.set_preference("browser.download.folderList", 2)
@@ -76,7 +77,7 @@ def find_target_links(driver):
         for download_btn in elements:
             # 获取文件名(用于后续判断)
             file_name = download_btn.text.strip()
-            print(f"正在下载: {file_name}")
+            log.info(f"正在下载: {file_name}")
 
             # 记录下载前的文件列表
             existing_files = set(f.name for f in Path(download_dir).glob('*'))
@@ -88,7 +89,7 @@ def find_target_links(driver):
             # 等待文件下载完成
             rar_files = wait_for_download_complete(existing_files=existing_files)
             if not rar_files:
-                print("未找到新下载的 .rar 文件")
+                log.info("未找到新下载的 .rar 文件")
                 continue
 
             downloaded_file = rar_files[0]
@@ -98,20 +99,20 @@ def find_target_links(driver):
                     # 获取压缩包中的第一个 .xls 文件
                     xls_files = [f for f in rf.namelist() if f.endswith('.xls') or f.endswith('.xlsx')]
                     if not xls_files:
-                        print(f"压缩包 {downloaded_file.name} 中没有 .xls 文件")
+                        log.info(f"压缩包 {downloaded_file.name} 中没有 .xls 文件")
                         continue
 
                     for xls_file in xls_files:
                         if xls_file.startswith('2022'):
                             return 'stop'
                         if not xls_file or '美元值' in xls_file or '企业性质' in xls_file or '贸易方式' in xls_file or '按收发货所在地' in xls_file or '主要商品' in xls_file:
-                            print(f"检测到不需要的文件:{xls_file},跳过")
+                            log.info(f"检测到不需要的文件:{xls_file},跳过")
                             continue
                         # 解压到临时目录
                         temp_dir = Path(download_dir) / 'temp'
                         temp_dir.mkdir(parents=True, exist_ok=True)
                         if not extract_rar(downloaded_file, temp_dir):
-                            print(f"解压文件 {downloaded_file.name} 时发生错误")
+                            log.info(f"解压文件 {downloaded_file.name} 时发生错误")
                             continue
                         # 获取解压后的文件路径
                         match = re.search(r"(\d{4})年(\d{1,2})月", xls_file)
@@ -123,34 +124,34 @@ def find_target_links(driver):
                         extracted_file = temp_dir / xls_file
                         final_path = Path(download_dir) / year / month / extracted_file.name
                         if os.path.exists(final_path):
-                            print(f"文件已存在:{extracted_file.name} 正在覆盖...")
+                            log.info(f"文件已存在:{extracted_file.name} 正在覆盖...")
                             os.unlink(final_path)
 
                         final_dir = Path(download_dir) / year / month
                         final_dir.mkdir(parents=True, exist_ok=True)
-                        print(f"√ 正在移动文件 {extracted_file} 至 {final_path}")
+                        log.info(f"√ 正在移动文件 {extracted_file} 至 {final_path}")
                         try:
                             extracted_file.rename(final_path)
-                            print(f"√ 下载成功:{final_path}")
+                            log.info(f"√ 下载成功:{final_path}")
                         except Exception as e:
-                            print(f"文件移动失败: {str(e)}")
+                            log.info(f"文件移动失败: {str(e)}")
 
                     # 删除临时目录(无论是否为空)
                     try:
                         shutil.rmtree(temp_dir)  # 替换 os.rmdir(temp_dir)
                     except Exception as e:
-                        print(f"删除临时目录失败: {str(e)}")
+                        log.info(f"删除临时目录失败: {str(e)}")
 
                 # 删除 .rar 文件
-                print(f"删除 .rar 文件:{downloaded_file}")
+                log.info(f"删除 .rar 文件:{downloaded_file}")
                 os.unlink(downloaded_file)
             else:
-                print(f"文件 {downloaded_file.name} 不是 .rar 文件,请手动处理")
+                log.info(f"文件 {downloaded_file.name} 不是 .rar 文件,请手动处理")
             # 将已处理的文件名加入集合
             processed_files.add(file_name)
         return None
     except Exception as e:
-        print(f"下载时发生异常: {str(e)}")
+        log.info(f"下载时发生异常: {str(e)}")
 
 def extract_rar(rar_path, extract_to):
     """备用解压函数(当 rarfile 失效时使用)"""
@@ -168,10 +169,10 @@ def extract_rar(rar_path, extract_to):
     )
 
     if result.returncode == 0:
-        print(f"解压成功: {rar_path} → {extract_to}")
+        log.info(f"解压成功: {rar_path} → {extract_to}")
         return True
     else:
-        print(f"解压失败: {result.stderr.decode('gbk')}")
+        log.info(f"解压失败: {result.stderr.decode('gbk')}")
         return False
 
 
@@ -208,7 +209,7 @@ def crawl_with_selenium(url):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                print("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止爬取")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -219,7 +220,7 @@ def crawl_with_selenium(url):
             # 访问下一页
             driver.get(next_page_url)
 
-            print(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始爬取 {next_page_url} 页面数据")
 
     finally:
         driver.quit()
@@ -262,7 +263,7 @@ def hierarchical_traversal(root_path, all_records):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         # 提取月份目录
         month_dirs = []
@@ -275,21 +276,21 @@ def hierarchical_traversal(root_path, all_records):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 gov_commodity_jiangsu_import_export.process_folder(md['path'], all_records)
                 gov_commodity_jiangsu_country.process_folder(md['path'])
                 gov_commodity_jiangsu_city.process_folder(md['path'])
 
 if __name__ == "__main__":
     crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html')
-    print(f"江苏南京海关全量数据下载任务完成")
+    log.info(f"江苏南京海关全量数据下载任务完成")
     # 等待5s后执行
     time.sleep(5)
     all_records = base_mysql.get_hs_all()
     hierarchical_traversal(base_country_code.download_dir, all_records)
-    print("江苏南京海关类章、国家、城市所有文件处理完成!")
+    log.info("江苏南京海关类章、国家、城市所有文件处理完成!")
     time.sleep(5)
     base_mysql.update_january_yoy('江苏省')
     base_mysql.update_shandong_yoy('江苏省')
-    print("江苏南京海关城市同比sql处理完成")
+    log.info("江苏南京海关城市同比sql处理完成")
 

+ 14 - 13
jiangsu/gov_commodity_jiangsu_city.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 city_code_map = {
     "南京市": "3201",
@@ -28,16 +29,16 @@ def get_df(path):
     global df,  df_type
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     if len(file_paths) == 1:
         file_path = file_paths[0]
-        print(f"处理单文件: {file_path.name}")
+        log.info(f"处理单文件: {file_path.name}")
         xls = pd.ExcelFile(file_path)
 
         sheet_name = base_country_code.find_sheet_by_keyword(file_path, "地")
         if not sheet_name:
-            print(f"{file_path} 未找到包含 地市 sheet")
+            log.info(f"{file_path} 未找到包含 地市 sheet")
             return None
         df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[5:]
         df_type = 0
@@ -45,7 +46,7 @@ def get_df(path):
     else:
         for file in file_paths:
             if "地区" in file.name:
-                print(f"处理多文件: {file.name}")
+                log.info(f"处理多文件: {file.name}")
                 file_path = Path(path) / file
                 df = pd.read_excel(file_path, header=None).iloc[6:]
                 df_type = 1
@@ -59,7 +60,7 @@ def process_folder(path):
     sql_arr = []
     res = get_df(path)
     if res is None:
-        print(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
+        log.info(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
         return
     df, df_type = res
     if df_type == 0:
@@ -77,11 +78,11 @@ def process_folder(path):
                 flag = True
                 break
         if flag:
-            print(f"忽略 {city_name}")
+            log.info(f"忽略 {city_name}")
             continue
         city_code = city_code_map.get(city_name)
         if not city_code:
-            print(f"未找到省 '{city_name}' 对应市编码")
+            log.info(f"未找到省 '{city_name}' 对应市编码")
             continue
 
         monthly_export, monthly_import, monthly_total = value_row(row, col_total_index)
@@ -95,10 +96,10 @@ def process_folder(path):
                f"('{year}', '{year_month}', '320000', '江苏省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_region_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_region_trade SQL 存表完成!")
 
 
 def value_row(row,col_total_index):
@@ -115,7 +116,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -124,13 +125,13 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"江苏南京海关城市所有文件处理完成!")
+    log.info(f"江苏南京海关城市所有文件处理完成!")
     time.sleep(5)
     base_mysql.update_january_yoy('江苏省')
     base_mysql.update_shandong_yoy('江苏省')
-    print("江苏南京同比sql处理完成")
+    log.info("江苏南京同比sql处理完成")

+ 12 - 11
jiangsu/gov_commodity_jiangsu_country.py

@@ -4,6 +4,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 # 排除地区名单
 EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "大洋洲", "南极洲",
@@ -14,16 +15,16 @@ def get_df(path):
     global df,  df_type
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     if len(file_paths) == 1:
         file_path = file_paths[0]
-        print(f"处理单文件: {file_path.name}")
+        log.info(f"处理单文件: {file_path.name}")
         xls = pd.ExcelFile(file_path)
 
         sheet_name = base_country_code.find_sheet_by_keyword(file_path, "国别")
         if not sheet_name:
-            print(f"{file_path} 未找到包含 类章 sheet")
+            log.info(f"{file_path} 未找到包含 类章 sheet")
             return None
         df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[5:]
         df_type = 0
@@ -31,7 +32,7 @@ def get_df(path):
     else:
         for file in file_paths:
             if "国别" in file.name:
-                print(f"处理多文件: {file.name}")
+                log.info(f"处理多文件: {file.name}")
                 file_path = Path(path) / file
                 df = pd.read_excel(file_path, header=None).iloc[6:]
                 df_type = 1
@@ -68,7 +69,7 @@ def process_folder(path):
             # 获取国家编码
             country_code = base_country_code.COUNTRY_CODE_MAPPING.get(country_name)
             if not country_code:
-                print(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
+                log.info(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
                 continue
 
             # 提取数据并格式化
@@ -88,12 +89,12 @@ def process_folder(path):
             )
             sql_arr.append(sql)
     except Exception as e:
-        print(f"{year_month} 处理时发生异常: {str(e)}")
+        log.info(f"{year_month} 处理时发生异常: {str(e)}")
 
-    print(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
+    log.info(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
     # 批量插入数据库
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_country_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_country_trade SQL 存表完成!")
 
 def value_row(row, col_total_index, col_monthly_export_index, col_monthly_import_index):
     def value_special_handler(value):
@@ -117,7 +118,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -126,10 +127,10 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print("江苏南京海关国别所有文件处理完成!")
+    log.info("江苏南京海关国别所有文件处理完成!")

+ 11 - 10
jiangsu/gov_commodity_jiangsu_import_export.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 YEAR_PATTERN = re.compile(r"^\d{4}$")
 MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
@@ -14,14 +15,14 @@ all_records = []
 def process_folder(path, all_records):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return
     year, month = base_country_code.extract_year_month_from_path(path)
     year_month = f'{year}-{month:02d}'
 
     if len(file_paths) == 1:
         file_path = file_paths[0]
-        print(f"处理单文件: {file_path.name}")
+        log.info(f"处理单文件: {file_path.name}")
         # 读取所有sheet
         xls = pd.ExcelFile(file_path)
         import_df = pd.DataFrame()
@@ -29,7 +30,7 @@ def process_folder(path, all_records):
         total_df = pd.DataFrame()
         sheet_name = base_country_code.find_sheet_by_keyword(file_path, "类章")
         if not sheet_name:
-            print(f"{file_path} 未找到包含 类章 sheet")
+            log.info(f"{file_path} 未找到包含 类章 sheet")
             return
         skip_index = 4 if year_month == '2024-11' else 5
         df = pd.read_excel(xls, sheet_name=sheet_name, header=None).iloc[skip_index:]
@@ -55,7 +56,7 @@ def process_folder(path, all_records):
         total_df = pd.DataFrame()
         for file in file_paths:
             if "商品类章" in file.name:
-                print(f"处理多文件: {file.name}")
+                log.info(f"处理多文件: {file.name}")
                 file_path = Path(path) / file
                 df = pd.read_excel(file_path, header=None).iloc[6:]
 
@@ -97,10 +98,10 @@ def save_to_database(import_df, export_df, total_df, year, month, all_records):
         # 找类名确定索引
         result = extract_category_or_chapter(commodity_name, all_records_index)
         if result is None:
-            print(f"未找到商品名称 '{commodity_name}' 对应的ID")
+            log.info(f"未找到商品名称 '{commodity_name}' 对应的ID")
             continue
         if result >= len(all_records):
-            print(f"all_records 已超限 '{commodity_name}' 跳过")
+            log.info(f"all_records 已超限 '{commodity_name}' 跳过")
             continue
         all_records_index = result
 
@@ -119,9 +120,9 @@ def save_to_database(import_df, export_df, total_df, year, month, all_records):
 
         processed_commodities.add(commodity_code)
 
-    print(f"√ {year_month} 成功生成SQL文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} 成功生成SQL文件 size {len(sql_arr)} ")
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
 
 
 def extract_category_or_chapter(text, all_records_index):
@@ -146,7 +147,7 @@ def hierarchical_traversal(root_path, all_records):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         # 提取月份目录
         month_dirs = []
@@ -159,7 +160,7 @@ def hierarchical_traversal(root_path, all_records):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'], all_records)
 
 if __name__ == '__main__':

+ 4 - 2
utils/base_country_code.py

@@ -5,6 +5,8 @@ from pathlib import Path
 import pandas as pd
 from openpyxl import load_workbook
 
+from utils.log import log
+
 YEAR_PATTERN = re.compile(r"^\d{4}$")
 MONTH_PATTERN = re.compile(r"^(0[1-9]|1[0-2])$")
 
@@ -63,7 +65,7 @@ def get_previous_month_dir(current_path):
 
         return current_path.parent.parent / current_path.parent.name / f"{prev_month:02d}"
     except Exception as e:
-        print(f"前月目录生成失败:{str(e)}")
+        log.info(f"前月目录生成失败:{str(e)}")
         return None
 
 COUNTRY_CODE_MAPPING = {
@@ -360,4 +362,4 @@ download_dir_find = os.path.abspath(os.path.join('downloads/demo'))
 
 if __name__ == '__main__':
     year, month = extract_year_month_from_path(Path(download_dir)/'2025'/'02')
-    print(year, month)
+    log.info(year, month)

+ 17 - 15
utils/base_mysql.py

@@ -2,6 +2,8 @@ import pymysql
 from sqlalchemy import create_engine, text
 from urllib.parse import quote_plus
 
+from utils.log import log
+
 # 数据库配置
 DB_CONFIG = {
     'host': '10.130.75.149',
@@ -31,14 +33,14 @@ def get_commodity_id(commodity_name):
                 if len(result) == 1:
                     return result[0][0], result[0][1]
                 else:
-                    print(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
+                    log.info(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
                     sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
                     cursor.execute(sql, (f"{fix_commodity_name}",))
                     result = cursor.fetchone()
                     if not result:
                         # 用原商品名称再查一次
                         commodity_name = commodity_name.replace("(", "(").replace(")", ")")
-                        print(f"原商品名称查询,commodity_name:{commodity_name}")
+                        log.info(f"原商品名称查询,commodity_name:{commodity_name}")
                         sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
                         cursor.execute(sql, (f"{commodity_name}",))
                         result = cursor.fetchone()
@@ -51,7 +53,7 @@ def get_commodity_id(commodity_name):
             else:
                 return None, None
     except Exception as e:
-        print(f"查询数据库时发生异常: {str(e)}")
+        log.info(f"查询数据库时发生异常: {str(e)}")
         return None, None
     finally:
         if connection:
@@ -72,7 +74,7 @@ def get_hs_all():
             else:
                 return None
     except Exception as e:
-        print(f"查询数据库时发生异常: {str(e)}")
+        log.info(f"查询数据库时发生异常: {str(e)}")
         return None
     finally:
         if connection:
@@ -94,7 +96,7 @@ def bulk_insert(sql_statements):
     :param sql_statements: 包含多个 INSERT 语句的列表
     """
     if not sql_statements:
-        print("未提供有效的 SQL 插入语句,跳过操作")
+        log.info("未提供有效的 SQL 插入语句,跳过操作")
         return
 
     try:
@@ -103,9 +105,9 @@ def bulk_insert(sql_statements):
                 for sql in sql_statements:
                     stmt = text(sql.strip())
                     conn.execute(stmt)
-                print(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
+                log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
     except Exception as e:
-        print(f"数据库操作失败: {str(e)}")
+        log.info(f"数据库操作失败: {str(e)}")
         raise
 
 def update_january_yoy(prov_name):
@@ -148,11 +150,11 @@ def update_january_yoy(prov_name):
     try:
         with engine.begin() as conn:
             result = conn.execute(update_sql, {'prov_name': prov_name})
-            print(f"Updated {result.rowcount} rows for {prov_name}")
+            log.info(f"Updated {result.rowcount} rows for {prov_name}")
             return result.rowcount
 
     except Exception as e:
-        print(f"Update failed: {str(e)}")
+        log.info(f"Update failed: {str(e)}")
         raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
 
 def clear_old_shandong_yoy(prov_name):
@@ -174,10 +176,10 @@ def clear_old_shandong_yoy(prov_name):
     try:
         with engine.begin() as conn:
             result = conn.execute(clear_sql, {'prov_name': prov_name})
-            print(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
+            log.info(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
             return result.rowcount
     except Exception as e:
-        print(f"旧数据清零失败: {str(e)}")
+        log.info(f"旧数据清零失败: {str(e)}")
         raise
 
 def update_shandong_yoy(prov_name):
@@ -191,10 +193,10 @@ def update_shandong_yoy(prov_name):
         # 步骤2:计算新数据
         updated = _update_shandong_new_yoy(prov_name)
 
-        print(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
+        log.info(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
         return {'cleared': cleared, 'updated': updated}
     except Exception as e:
-        print("{prov_name} 数据处理失败", exc_info=True)
+        log.info("{prov_name} 数据处理失败", exc_info=True)
         raise
 
 def _update_shandong_new_yoy(prov_name):
@@ -232,11 +234,11 @@ def _update_shandong_new_yoy(prov_name):
 
     with engine.begin() as conn:
         result = conn.execute(update_sql, {'prov_name': prov_name})
-        print(f"{prov_name} 新数据更新数: {result.rowcount}")
+        log.info(f"{prov_name} 新数据更新数: {result.rowcount}")
         return result.rowcount
 
 
 if __name__ == '__main__':
     update_january_yoy('浙江省')
     update_shandong_yoy('浙江省')
-    print("同比sql处理完成")
+    log.info("同比sql处理完成")

+ 7 - 4
utils/crawl_gov_commodity.py

@@ -1,5 +1,8 @@
 import pandas as pd
-from com.zf.crawl import base_mysql
+
+from utils import base_mysql
+from utils.log import log
+
 
 def generate_sql_from_excel(excel_file):
     # 读取 Excel 文件
@@ -21,12 +24,12 @@ def generate_sql_from_excel(excel_file):
                 commodity_name = commodity_name.replace('(', '(').replace(')', ')')
         sql = f"INSERT INTO t_yujin_crossborder_prov_commodity_category (code, level,commodity_code, commodity_name, create_time) VALUES ('{code}', {level}, '{commodity_code}', '{commodity_name}', now());"
         sql_arr.append(sql)
-        print(sql)
+        log.info(sql)
 
-    print(f"√ 成功生成 commodity category SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ 成功生成 commodity category SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
-    print("trade SQL 存表完成!")
+    log.info("trade SQL 存表完成!")
 
 # 商品书数据初始化
 excel_file = 'C:\\Users\\admin\\Desktop\\海关总署数据梳理.xlsx'  # 输入的 Excel 文件

+ 28 - 27
zhejiang/crawl_gov_zhejiangi_full.py

@@ -18,6 +18,7 @@ import gov_commodity_zhejiang_city
 import gov_commodity_zhejiang_country
 import gov_commodity_zhejiang_import_export
 from utils import base_country_code, base_mysql
+from utils.log import log
 
 download_dir = base_country_code.download_dir
 Path(download_dir).mkdir(parents=True, exist_ok=True)
@@ -25,7 +26,7 @@ Path(download_dir).mkdir(parents=True, exist_ok=True)
 def configure_stealth_options():
     """增强型反检测配置[1,4](@ref)"""
     opts = FirefoxOptions()
-    print("当前下载路径:", Path(download_dir).resolve())
+    log.info("当前下载路径:", Path(download_dir).resolve())
     # 文件下载配置
     opts.set_preference("browser.download.dir", download_dir)
     opts.set_preference("browser.download.folderList", 2)
@@ -62,7 +63,7 @@ def crawl_by_year_tabs(driver, base_url):
     for tab in year_tabs:
         year_text = tab.text.strip()
         if int(year_text[:4]) <= 2022:
-            print(f"{year_text} 后的数据无需下载")
+            log.info(f"{year_text} 后的数据无需下载")
             continue
 
         year_url = tab.get_attribute("href")
@@ -72,7 +73,7 @@ def crawl_by_year_tabs(driver, base_url):
         # 新标签页打开年份页面
         driver.execute_script("window.open(arguments[0]);", year_url)
         driver.switch_to.window(driver.window_handles[-1])
-        print(f"\n正在处理 {year_text} 年份页面")
+        log.info(f"\n正在处理 {year_text} 年份页面")
 
         process_month_tabs(driver, year_text, base_url)
 
@@ -98,7 +99,7 @@ def process_month_tabs(driver, year, base_url):
             # 全量获取所有月份Tab
             month_items = driver.find_elements(By.XPATH, '//ul[@class="nav_tab"]//li')
             if not month_items:
-                print(f"{year}年没有月份Tab,停止处理")
+                log.info(f"{year}年没有月份Tab,停止处理")
                 break
 
             all_found = True
@@ -114,7 +115,7 @@ def process_month_tabs(driver, year, base_url):
                 if not month_text in target_months:
                     continue  # 跳过已处理月份
 
-                print(f"点击月份Tab:{year}-{month_text}")
+                log.info(f"点击月份Tab:{year}-{month_text}")
                 a_tag.click()
 
                 # 处理详情页逻辑
@@ -123,9 +124,9 @@ def process_month_tabs(driver, year, base_url):
                 )
                 detail_link_arr = get_behind_detail_link(driver, base_url)
                 if not detail_link_arr:
-                    print(f"{year}-{month_text} 未找到详情链接")
+                    log.info(f"{year}-{month_text} 未找到详情链接")
                 for detail_link in detail_link_arr:
-                    print(f"{year}-{month_text} 详情链接:{detail_link}")
+                    log.info(f"{year}-{month_text} 详情链接:{detail_link}")
                     driver.get(detail_link)
                     download_file_from_detail_page(driver)
                     driver.back()
@@ -137,24 +138,24 @@ def process_month_tabs(driver, year, base_url):
                 found = True
 
             if not found:
-                print(f"{year}年未找到 {month_text} Tab")
+                log.info(f"{year}年未找到 {month_text} Tab")
                 all_found = False
 
             if all_found:
-                print(f"{year}年所有目标月份处理完成")
+                log.info(f"{year}年所有目标月份处理完成")
                 break
             else:
                 # 部分月份未找到,重新获取元素
                 # retry_count += 1
-                print(f"第 {retry_count} 次重试获取月份Tab...")
+                log.info(f"第 {retry_count} 次重试获取月份Tab...")
                 time.sleep(2)
 
         except StaleElementReferenceException:
-            print("页面刷新,重新获取月份Tab列表...")
+            log.info("页面刷新,重新获取月份Tab列表...")
             # retry_count += 1
             time.sleep(2)
 
-    print(f"{year}年最终处理的月份:{processed_months}")
+    log.info(f"{year}年最终处理的月份:{processed_months}")
 
 def get_behind_detail_link(driver, base_url):
    """获取点击月份Tab后 conList_ul 下所有 li 的 a 标签完整链接"""
@@ -170,7 +171,7 @@ def get_behind_detail_link(driver, base_url):
            href_arr.append(full_url)
        return href_arr
    except Exception as e:
-       print(f"获取详情链接失败: {str(e)}")
+       log.info(f"获取详情链接失败: {str(e)}")
        return []
 
 def download_file_from_detail_page(driver):
@@ -181,7 +182,7 @@ def download_file_from_detail_page(driver):
     try:
         elements = driver.find_elements(By.XPATH, '//div[@class="easysite-news-content"]//div[@id="easysiteText"]//p//a')
         if not elements:
-            print("详情页未找到目标文件链接")
+            log.info("详情页未找到目标文件链接")
             return
 
         for download_btn in elements:
@@ -191,10 +192,10 @@ def download_file_from_detail_page(driver):
             file_url = download_btn.get_attribute("href")
 
             if not file_url.lower().endswith(('.xls', '.xlsx')):
-                print(f"跳过非 Excel 文件: {file_url}")
+                log.info(f"跳过非 Excel 文件: {file_url}")
                 continue
 
-            print(f"正在下载: {file_name} → {file_url}")
+            log.info(f"正在下载: {file_name} → {file_url}")
 
             # 记录下载前的文件列表
             existing_files = set(f.name for f in Path(download_dir).glob('*'))
@@ -207,17 +208,17 @@ def download_file_from_detail_page(driver):
             year, start_month, month = extract_year_and_month(file_name)
             final_path = Path(download_dir) / year / month / f"{file_name}"
             if os.path.exists(final_path):
-                print(f"文件已存在:{file_name} 正在覆盖...")
+                log.info(f"文件已存在:{file_name} 正在覆盖...")
                 os.unlink(final_path)
 
             final_dir = Path(download_dir) / year / month
             final_dir.mkdir(parents=True, exist_ok=True)
-            print(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
+            log.info(f"√ 正在移动文件 {downloaded_file} 至 {final_path}")
             downloaded_file.rename(final_path)
-            print(f"√ 下载成功:{final_path}")
+            log.info(f"√ 下载成功:{final_path}")
 
     except Exception as e:
-        print(f"详情页处理异常: {str(e)}")
+        log.info(f"详情页处理异常: {str(e)}")
 
 def extract_year_and_month(file_name):
     # 支持两种格式:
@@ -250,10 +251,10 @@ def extract_rar(rar_path, extract_to):
     )
 
     if result.returncode == 0:
-        print(f"解压成功: {rar_path} → {extract_to}")
+        log.info(f"解压成功: {rar_path} → {extract_to}")
         return True
     else:
-        print(f"解压失败: {result.stderr.decode('gbk')}")
+        log.info(f"解压失败: {result.stderr.decode('gbk')}")
         return False
 
 
@@ -320,7 +321,7 @@ def hierarchical_traversal(root_path):
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
         # 构造完整的路径:download/shandong/2025/03
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:zhejiang")
 
         # 提取月份目录
         month_dirs = []
@@ -333,20 +334,20 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 gov_commodity_zhejiang_import_export.process_folder(md['path'])
                 gov_commodity_zhejiang_country.process_folder(md['path'])
                 gov_commodity_zhejiang_city.process_folder(md['path'])
 
 if __name__ == "__main__":
     crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html')
-    print(f"浙江杭州海关全量数据下载任务完成")
+    log.info(f"浙江杭州海关全量数据下载任务完成")
     # 等待5s后执行
     time.sleep(5)
     hierarchical_traversal(base_country_code.download_dir)
-    print("浙江杭州海关类章、国家、城市所有文件处理完成!")
+    log.info("浙江杭州海关类章、国家、城市所有文件处理完成!")
     time.sleep(5)
     base_mysql.update_january_yoy('浙江省')
     base_mysql.update_shandong_yoy('浙江省')
-    print("浙江杭州海关城市同比sql处理完成")
+    log.info("浙江杭州海关城市同比sql处理完成")
 

+ 14 - 13
zhejiang/gov_commodity_zhejiang_city.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 city_code_map = {
     "杭州地区": "330100",
@@ -23,14 +24,14 @@ city_code_map = {
 def get_df(path, year_month):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     file_path = file_paths[0]
     sheet_name = base_country_code.find_sheet_by_keyword(file_path, "十一地市")
 
     flag = True
     if not sheet_name:
-        print(f"{file_path} 未找到包含 十一地市 sheet")
+        log.info(f"{file_path} 未找到包含 十一地市 sheet")
         # 23年1-11月数据要在多文件里找
         for file_path in file_paths:
             if '十一地市' in file_path.name:
@@ -39,7 +40,7 @@ def get_df(path, year_month):
                 break
 
     if not sheet_name and flag:
-        print(f"{path} 未找到包含 十一地市 sheet或文件")
+        log.info(f"{path} 未找到包含 十一地市 sheet或文件")
         return None
     if flag:
         xls = pd.ExcelFile(file_path)
@@ -77,7 +78,7 @@ def process_folder(path):
     sql_arr = []
     res = get_df(path, None)
     if res is None:
-        print(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
+        log.info(f"{year_month} prov_region_trade 未找到包含 地市 sheet")
         return
     import_df, export_df, total_df = res
     # 当月数据分组清洗
@@ -89,7 +90,7 @@ def process_folder(path):
         previous_month_dir = base_country_code.get_previous_month_dir(path)
         res = get_df(previous_month_dir, year_month)
         if res is None:
-            print(f"{path} 上月目录里文件未找到包含 地市 sheet")
+            log.info(f"{path} 上月目录里文件未找到包含 地市 sheet")
             return
         prev_import_df, prev_export_df, prev_total_df = res
 
@@ -107,7 +108,7 @@ def process_folder(path):
 
         total_df = pd.merge(total_df, prev_total_df, on='commodity', how='left')
         total_df['total'] = round(total_df['total_x'] - total_df['total_y'], 4)
-        print(f"合并文件: {path}*********{previous_month_dir}")
+        log.info(f"合并文件: {path}*********{previous_month_dir}")
 
     # 合并进出口数据
     merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer')
@@ -117,7 +118,7 @@ def process_folder(path):
         city_name = str(row['commodity']).strip()
         city_code = city_code_map.get(city_name)
         if not city_code:
-            print(f"未找到省 '{city_name}' 对应市编码")
+            log.info(f"未找到省 '{city_name}' 对应市编码")
             continue
 
         # 提取数据并格式化
@@ -137,9 +138,9 @@ def process_folder(path):
                f"('{year}', '{year_month}', '330000', '浙江省', '{city_code}', '{city_name}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, '{yoy_import_export}', '{yoy_import}', '{yoy_export}', now());\n")
         sql_arr.append(sql)
 
-    print(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_region_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_region_trade SQL 存表完成!")
+    log.info(f"√ {year_month} prov_region_trade SQL 存表完成!")
 
 def hierarchical_traversal(root_path):
     root = Path(root_path)
@@ -149,7 +150,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:zhejiang")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -158,15 +159,15 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
     hierarchical_traversal(base_country_code.download_dir)
-    print(f"浙江杭州海关城市所有文件处理完成!")
+    log.info(f"浙江杭州海关城市所有文件处理完成!")
     time.sleep(5)
     base_mysql.update_january_yoy('浙江省')
     base_mysql.update_shandong_yoy('浙江省')
-    print("同比sql处理完成")
+    log.info("同比sql处理完成")
     # root = Path(base_country_code.download_dir)/'2024'/'07'
     # process_folder(root)

+ 16 - 15
zhejiang/gov_commodity_zhejiang_country.py

@@ -4,6 +4,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 # 排除地区名单
 EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "大洋洲", "南极洲",
@@ -13,11 +14,11 @@ EXCLUDE_REGIONS = ["亚洲", "非洲", "欧洲", "拉丁美洲", "北美洲", "
 def get_df_country(path, year_month):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
 
     file_path = file_paths[0]
-    print(f"处理文件: {file_path.name}")
+    log.info(f"处理文件: {file_path.name}")
 
     xls = pd.ExcelFile(file_path)
     import_df = pd.DataFrame()
@@ -27,10 +28,10 @@ def get_df_country(path, year_month):
     flag = True
     sheet_name = base_country_code.find_sheet_by_keyword(file_path, "国别")
     if not sheet_name:
-        print(f"{file_path} 未找到包含 国别 sheet")
+        log.info(f"{file_path} 未找到包含 国别 sheet")
         sheet_name = base_country_code.find_sheet_by_keyword(file_path, "组织")
         if not sheet_name:
-            print(f"{file_path} 未找到包含 组织 sheet")
+            log.info(f"{file_path} 未找到包含 组织 sheet")
             # 23年1-11月数据要在多文件里找
             for file_path in file_paths:
                 if '洲贸组织' in file_path.name:
@@ -38,7 +39,7 @@ def get_df_country(path, year_month):
                     flag = False
                     break
     if not sheet_name and flag:
-        print(f"{path} 未找到包含 国别 | 组织 | 洲贸组织 sheet或文件")
+        log.info(f"{path} 未找到包含 国别 | 组织 | 洲贸组织 sheet或文件")
         return None
     if flag:
         df = pd.read_excel(xls, sheet_name=sheet_name, header=None)
@@ -67,7 +68,7 @@ def get_df_country(path, year_month):
 def process_folder(path):
     res = get_df_country(path, None)
     if not res:
-        print(f"{path} 目录里文件未找到包含 国别 sheet")
+        log.info(f"{path} 目录里文件未找到包含 国别 sheet")
         return
     import_df, export_df, total_df = res
 
@@ -83,7 +84,7 @@ def process_folder(path):
         previous_month_dir = base_country_code.get_previous_month_dir(path)
         res = get_df_country(previous_month_dir, year_month)
         if not res:
-            print(f"{path} 上月目录里文件未找到包含 国别 sheet")
+            log.info(f"{path} 上月目录里文件未找到包含 国别 sheet")
             return
         prev_import_df, prev_export_df, prev_total_df = res
 
@@ -101,7 +102,7 @@ def process_folder(path):
 
         total_df = pd.merge(total_df, prev_total_df, on='commodity', how='left')
         total_df['total'] = round(total_df['total_x'] - total_df['total_y'], 4)
-        print(f"合并文件: {path}*********{previous_month_dir}")
+        log.info(f"合并文件: {path}*********{previous_month_dir}")
 
     # 合并进出口数据
     merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer')
@@ -119,7 +120,7 @@ def process_folder(path):
         # 获取国家编码
         country_code = base_country_code.COUNTRY_CODE_MAPPING.get(country_name)
         if not country_code:
-            print(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
+            log.info(f"{year_month} 未找到国家 '{country_name}' 对应的编码")
             continue
 
         # 提取数据并格式化
@@ -143,12 +144,12 @@ def process_folder(path):
         )
         sql_arr.append(sql)
     # except Exception as e:
-    #     print(f"{year_month} 处理时发生异常: {str(e)}")
+    #     log.info(f"{year_month} 处理时发生异常: {str(e)}")
 
-    print(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
+    log.info(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
     # 批量插入数据库
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_country_trade SQL 存表完成!\n")
+    log.info(f"√ {year_month} prov_country_trade SQL 存表完成!\n")
 
 def hierarchical_traversal(root_path):
     root = Path(root_path)
@@ -158,7 +159,7 @@ def hierarchical_traversal(root_path):
     ]
 
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:zhejiang")
 
         month_dirs = []
         for item in year_dir.iterdir():
@@ -167,7 +168,7 @@ def hierarchical_traversal(root_path):
 
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 
@@ -176,4 +177,4 @@ if __name__ == '__main__':
 
     root = Path(base_country_code.download_dir) / '2024' / '07'
     process_folder(root)
-    print("浙江杭州海关国别所有文件处理完成!")
+    log.info("浙江杭州海关国别所有文件处理完成!")

+ 20 - 19
zhejiang/gov_commodity_zhejiang_import_export.py

@@ -5,6 +5,7 @@ import pandas as pd
 
 from utils import base_country_code, base_mysql
 from utils.base_country_code import format_sql_value
+from utils.log import log
 
 CUSTOM_COMMODITY_REPLACEMENTS = {
     '稻谷及大米': '稻谷、大米及大米粉',
@@ -43,10 +44,10 @@ def clean_commodity_name(name, preserve_keywords=None):
 def get_df_import_export(path, year_month):
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
-        print("未找到任何文件")
+        log.info("未找到任何文件")
         return None
     file_path = file_paths[0]
-    print(f"处理文件: {file_path.name}")
+    log.info(f"处理文件: {file_path.name}")
 
     xls = pd.ExcelFile(file_path)
     import_df = pd.DataFrame()
@@ -55,7 +56,7 @@ def get_df_import_export(path, year_month):
     flag = True
     sheet_name = base_country_code.find_sheet_by_keyword(file_path, "主出商品")
     if not sheet_name:
-        print(f"{file_path} 单文件未找到包含 主出商品 sheet")
+        log.info(f"{file_path} 单文件未找到包含 主出商品 sheet")
         # 23年1-11月数据要在多文件里找
         for file_path in file_paths:
             if '主要出口商品' in file_path.name:
@@ -63,7 +64,7 @@ def get_df_import_export(path, year_month):
                 flag = False
                 break
     if not sheet_name and flag:
-        print(f"{path} 中未找到 主出商品 sheet或文件")
+        log.info(f"{path} 中未找到 主出商品 sheet或文件")
         return None
 
     if flag:
@@ -85,7 +86,7 @@ def get_df_import_export(path, year_month):
     flag_2 = True
     sheet_name = base_country_code.find_sheet_by_keyword(file_path, "主进商品")
     if not sheet_name:
-        print(f"{file_path} 单文件未找到包含 主进商品 sheet")
+        log.info(f"{file_path} 单文件未找到包含 主进商品 sheet")
         # 23年1-11月数据要在多文件里找
         for file_path in file_paths:
             if '主要进口商品' in file_path.name:
@@ -93,7 +94,7 @@ def get_df_import_export(path, year_month):
                 flag_2 = False
                 break
     if not sheet_name and flag_2:
-        print(f"{path} 中未找到 主进商品 sheet或文件")
+        log.info(f"{path} 中未找到 主进商品 sheet或文件")
         return None
 
     if flag_2:
@@ -118,7 +119,7 @@ def get_df_import_export(path, year_month):
 def process_folder(path):
     res = get_df_import_export(path, None)
     if not res:
-        print(f"{path} 目录里文件未找到包含 主出、主进商品 sheet")
+        log.info(f"{path} 目录里文件未找到包含 主出、主进商品 sheet")
         return
     import_df, export_df = res
 
@@ -133,7 +134,7 @@ def process_folder(path):
         previous_month_dir = base_country_code.get_previous_month_dir(path)
         res = get_df_import_export(previous_month_dir, year_month)
         if not res:
-            print(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
+            log.info(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
             return
         prev_import_df, prev_export_df = res
 
@@ -147,7 +148,7 @@ def process_folder(path):
 
         curr_export = pd.merge(curr_export, prev_export, on='commodity', how='left')
         curr_export['export'] = round(curr_export['export_x'] - curr_export['export_y'], 4)
-        print(f"合并文件: {path}*********{previous_month_dir}")
+        log.info(f"合并文件: {path}*********{previous_month_dir}")
 
     # 合并进出口数据
     merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer')
@@ -161,14 +162,14 @@ def save_to_database(merged_df, year, month):
         for _, row in merged_df.iterrows():
             commodity_name = str(row['commodity']).strip()
             if commodity_name == '消费品' or commodity_name == '劳动密集型产品':
-                print(f'{commodity_name} 商品不存在,跳过')
+                log.info(f'{commodity_name} 商品不存在,跳过')
                 continue
             commodity_code, commodity_name_fix = base_mysql.get_commodity_id(commodity_name)
             if not commodity_code:
-                print(f"未找到商品名称 '{commodity_name}' 对应的 ID")
+                log.info(f"未找到商品名称 '{commodity_name}' 对应的 ID")
                 continue
             if not commodity_name_fix or commodity_name_fix in processed_commodities:
-                print(f"已处理过 '{commodity_name_fix}',传入name:{commodity_name}")
+                log.info(f"已处理过 '{commodity_name_fix}',传入name:{commodity_name}")
                 continue
 
             if year == 2025 or (year == 2024 and month in [7, 8, 9, 10, 11, 12]):
@@ -194,15 +195,15 @@ def save_to_database(merged_df, year, month):
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)
-            # print(f'{commodity_name} -> {commodity_name_fix}')
+            # log.info(f'{commodity_name} -> {commodity_name_fix}')
 
     except Exception as e:
-        print(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
+        log.info(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
 
-    print(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
+    log.info(f"√ {year_month} prov_commodity_trade 成功生成 SQL 文件 size {len(sql_arr)} ")
     # 解析完后生成sql文件批量入库
     base_mysql.bulk_insert(sql_arr)
-    print(f"√ {year_month} prov_commodity_trade SQL 存表完成!\n")
+    log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!\n")
 
 def hierarchical_traversal(root_path):
     """分层遍历:省份->年份->月目录"""
@@ -215,7 +216,7 @@ def hierarchical_traversal(root_path):
 
     # 按年倒序
     for year_dir in sorted(year_dirs, key=lambda x: x.name, reverse=True):
-        print(f"\n年份:{year_dir.name} | 省份:jiangsu")
+        log.info(f"\n年份:{year_dir.name} | 省份:jiangsu")
 
         # 提取月份目录
         month_dirs = []
@@ -228,7 +229,7 @@ def hierarchical_traversal(root_path):
         # 按月倒序输出
         if month_dirs:
             for md in sorted(month_dirs, key=lambda x: x["month"], reverse=True):
-                print(f"  月份:{md['month']:02d} | 路径:{md['path']}")
+                log.info(f"  月份:{md['month']:02d} | 路径:{md['path']}")
                 process_folder(md['path'])
 
 if __name__ == '__main__':
@@ -236,4 +237,4 @@ if __name__ == '__main__':
 
     # root = Path(base_country_code.download_dir)/'2023'/'01'
     # process_folder(root)
-    print("浙江杭州海关类章所有文件处理完成!")
+    log.info("浙江杭州海关类章所有文件处理完成!")