Selaa lähdekoodia

数据清洗单独脚本

01495251 1 kuukausi sitten
vanhempi
commit
931af23721

+ 7 - 1
crossborder/guangdong/guangdong_sub_customs_parse_excel.py

@@ -1,3 +1,4 @@
+import argparse
 from decimal import Decimal, InvalidOperation
 from pathlib import Path
 
@@ -673,6 +674,11 @@ def convert_unit(value):
 # 测试入口
 if __name__ == "__main__":
 
-    traverse_and_process(download_dir, parse_excel, province_name="guangdong")
+    parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
+    parser.add_argument('--year', type=int, default=None,
+                        help='终止年份(如2023),未指定时清洗最新一个月数据')
+    args = parser.parse_args()
+
+    traverse_and_process(download_dir, parse_excel, province_name="guangdong",  year=args.year)
     db_helper = DBHelper()
     db_helper.update_prov_yoy("广东省")

+ 1 - 1
crossborder/guangdong/selenium_guangdong_download.py

@@ -28,7 +28,7 @@ PROV_NAME = "广东省"
 
 db = DBHelper()
 
-
+#广东省海关数据    页面无下载按钮,这里数据为在线读取页面表格数据
 
 def detect_latest_month(driver):
     """三级回溯智能检测最新有效月份(修正年/月匹配逻辑)"""

+ 13 - 5
crossborder/henan/henan_parse_excel.py

@@ -1,3 +1,4 @@
+import argparse
 import re
 from pathlib import Path
 
@@ -5,8 +6,9 @@ import pandas as pd
 
 from crossborder.utils.db_helper import DBHelper
 from crossborder.utils.constants import COUNTRY_CODE_MAPPING, EXCLUDE_REGIONS, DOWNLOAD_DIR
-from crossborder.utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, find_unmatched_countries, \
-    extract_year_month_from_path, traverse_and_process
+from crossborder.utils.parse_utils import clean_county_name, clean_commodity_name, convert_wan_to_yuan, \
+    find_unmatched_countries, \
+    extract_year_month_from_path, traverse_and_process, parse_value
 
 # 常量配置(新增路径正则校验)
 PROV_CODE = "410000"
@@ -241,8 +243,8 @@ def read_trade_pair(import_path, export_path):
     ))
 
     merged = pd.merge(df_import, df_export, on="commodity_name", how="outer").fillna(0)
-    merged["monthly_import"] = merged["monthly_import"].apply(convert_wan_to_yuan)
-    merged["monthly_export"] = merged["monthly_export"].apply(convert_wan_to_yuan)
+    merged["monthly_import"] = merged["monthly_import"].apply(parse_value)
+    merged["monthly_export"] = merged["monthly_export"].apply(parse_value)
     return merged
 
 
@@ -262,4 +264,10 @@ def calculate_monthly_values(current_data, prev_data):
 
 
 if __name__ == "__main__":
-    traverse_and_process(download_dir, parse_excel, province_name="henan")
+
+    parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
+    parser.add_argument('--year', type=int, default=None,
+                        help='终止年份(如2023),未指定时清洗最新一个月数据')
+    args = parser.parse_args()
+
+    traverse_and_process(download_dir, parse_excel, province_name="henan",  year=args.year)

+ 7 - 11
crossborder/quanguo/data_cleaning_to_db.py

@@ -9,8 +9,8 @@ from crossborder.quanguo.parse_country_table_excel import parse_country_table_ex
 from crossborder.quanguo.parse_month_excel import parse_month_table_excel
 from crossborder.quanguo.parse_region_table_excel import parse_region_table_excel
 from crossborder.quanguo.parse_year_excel import parse_year_table_excel
+from crossborder.utils import base_mysql
 from crossborder.utils.constants import DOWNLOAD_DIR
-from crossborder.utils.db_helper import DBHelper
 from crossborder.utils.log import log
 
 
@@ -86,10 +86,10 @@ def perform_data_cleanup_and_import(current_year):
         raise
     finally:
         log.info("更新省市同比数据!")
-        db =DBHelper()
-        db.update_prov_yoy("河南省")
+        base_mysql.update_shandong_yoy("河南省")
+        base_mysql.update_shandong_yoy_origin("山东省")
 
-def process_all_region_tables():
+def main():
     """
     按年份倒序处理(如:2025 -> 2024 -> 2023),每个月份也按倒序处理,
     解析所有'收发货人所在地表'文件。
@@ -138,14 +138,10 @@ def process_all_region_tables():
             log.error(f"{year} 年数据处理失败: {str(e)}")
         finally:
             log.info("更新省市同比数据!")
-            db = DBHelper()
-            db.update_prov_yoy("河南省")
-
-
-# if __name__ == "__main__":
-#     process_all_region_tables()
-
+            base_mysql.update_shandong_yoy("河南省")
+            base_mysql.update_shandong_yoy_origin("山东省")
 
 
 if __name__ == "__main__":
+    # process_all_region_tables()
     perform_data_cleanup_and_import(2025)

+ 8 - 1
crossborder/shandong/shandong_parse_excel.py

@@ -1,3 +1,4 @@
+import argparse
 import re
 from pathlib import Path
 
@@ -315,7 +316,13 @@ def calculate_monthly_values(current_data, prev_data):
 
 
 if __name__ == "__main__":
-    traverse_and_process(download_dir, parse_excel, province_name="shandong")
+
+    parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
+    parser.add_argument('--year', type=int, default=None,
+                        help='终止年份(如2023),未指定时清洗最新一个月数据')
+    args = parser.parse_args()
+
+    traverse_and_process(download_dir, parse_excel, province_name="shandong", year=args.year)
     log.info("\n山东省地级市数据同比更新中...")
     db_helper = DBHelper()
     db_helper.update_prov_yoy("山东省")

+ 2 - 1
crossborder/utils/constants.py

@@ -6,7 +6,8 @@ PROJECT_ROOT = Path(os.path.abspath(os.path.dirname(__file__))).parent.parent
 
 if sys.platform.startswith('linux'):
     # Linux环境指定为/home目录
-    DOWNLOAD_DIR = Path.home() / 'downloads'
+    DOWNLOAD_DIR = r'/home/crossborder/downloads'
+    Path.home()
 else:
     # Windows保持原有结构(项目根目录下的downloads文件夹)
     DOWNLOAD_DIR = PROJECT_ROOT / 'downloads'

+ 1 - 1
crossborder/utils/log.py

@@ -14,7 +14,7 @@ project_root = Path(os.path.abspath(os.path.dirname(__file__))).parent.parent
 
 if sys.platform.startswith('linux'):
     # Linux环境指定为/home目录
-    log_dir = Path.home() / 'logs'
+    log_dir = r'/home/crossborder/logs'
 else:
     log_dir = project_root / 'logs'
 

+ 17 - 8
crossborder/utils/parse_utils.py

@@ -98,17 +98,26 @@ def find_unmatched_countries(final_df):
             log.info(f"   - {name}")
 
 def extract_year_month_from_path(path):
+    """
+    从路径中提取年份和月份,兼容文件路径(如 .../shandong/2025/04/信息收集和环境.xls)
+    """
     parts = path.parts
+
     try:
-        year_part = parts[-2]
-        month_part = parts[-1]
-        if not YEAR_PATTERN.match(year_part):
-            raise ValueError(f"无效年份格式:{year_part}")
-        if not MONTH_PATTERN.match(month_part):
-            raise ValueError(f"无效月份格式:{month_part}")
-        return int(year_part), int(month_part)
+        # 如果是文件路径,尝试向上查找直到找到年份和月份目录
+        idx = -1
+        while len(parts) + idx >= 2:
+            year_part = parts[idx - 1]
+            month_part = parts[idx]
+            if YEAR_PATTERN.match(year_part) and MONTH_PATTERN.match(month_part):
+                return int(year_part), int(month_part)
+            idx -= 1
+
+        # 如果没找到符合条件的结构
+        raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04 或 .../shandong/2025/04/信息收集和环境.xls")
+
     except IndexError:
-        raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04")
+        raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04 或 .../shandong/2025/04/信息收集和环境.xls")
 
 #获取上月目录
 def get_previous_month_dir(current_path):

+ 1 - 0
pyproject.toml

@@ -96,6 +96,7 @@ run-jiangsu = "crossborder.jiangsu.gov_commodity_jiangsu_country:main"
 run-hebei = "crossborder.hebei.crawl_gov_hebei_full:main"
 run-zhejiang = "crossborder.zhejiang.crawl_gov_zhejiang_full:main"
 run-quanguo = "crossborder.quanguo.selenium_download:main"
+run-total-city = "crossborder.data_cleaning_to_db:main"
 run-crossborder = "crossborder.cli:main"
 
 [build-system]