Explorar o código

1.海关总署年度表,月度表只下载最新月份,数据清洗只执行一次
2.海关总署年度表,月度表数据逻辑清洗只执行一次

zhangfan hai 1 mes
pai
achega
a438020283

+ 0 - 1
crossborder/anhui/crawl_gov_anhui_full.py

@@ -255,7 +255,6 @@ def crawl_with_selenium(url, mark):
         hierarchical_traversal(download_dir)
         print("安徽合肥海关类章、国家、城市所有文件处理完成!")
         time.sleep(5)
-        base_mysql.update_january_yoy('安徽省')
         base_mysql.update_shandong_yoy('安徽省')
         print("安徽合肥海关城市同比sql处理完成")
 

+ 0 - 1
crossborder/guangdong/selenium_guangdong_download.py

@@ -435,7 +435,6 @@ def main():
 
     finally:
         driver.quit()
-        log.info("\n数据清洗入库中...")
 
 
 if __name__ == "__main__":

+ 0 - 1
crossborder/hebei/crawl_gov_hebei_full.py

@@ -231,7 +231,6 @@ def crawl_with_selenium(url, mark):
         hierarchical_traversal(download_dir)
         log.info(f"河北石家庄海关全量数据下载任务完成")
         time.sleep(5)
-        base_mysql.update_january_yoy('河北省')
         base_mysql.update_shandong_yoy('河北省')
         log.info("河北石家庄海关城市同比sql处理完成")
 

+ 0 - 1
crossborder/jiangsu/crawl_gov_jiangsu_full.py

@@ -275,7 +275,6 @@ def crawl_with_selenium(url, mark):
         hierarchical_traversal(download_dir, all_records)
         log.info("江苏南京海关类章、国家、城市所有文件处理完成!")
         time.sleep(5)
-        base_mysql.update_january_yoy('江苏省')
         base_mysql.update_shandong_yoy('江苏省')
         log.info("江苏南京海关城市同比sql处理完成")
 

+ 0 - 1
crossborder/jiangsu/gov_commodity_jiangsu_city.py

@@ -131,6 +131,5 @@ if __name__ == '__main__':
     hierarchical_traversal(download_dir)
     log.info(f"江苏南京海关城市所有文件处理完成!")
     time.sleep(5)
-    base_mysql.update_january_yoy('江苏省')
     base_mysql.update_shandong_yoy('江苏省')
     log.info("江苏南京同比sql处理完成")

+ 3 - 3
crossborder/jiangsu/gov_commodity_jiangsu_import_export.py

@@ -163,8 +163,8 @@ def hierarchical_traversal(root_path, all_records):
 
 if __name__ == '__main__':
     all_records = base_mysql.get_hs_all()
-    hierarchical_traversal(download_dir, all_records)
+    # hierarchical_traversal(download_dir, all_records)
 
-    # root = Path(download_dir)/'2024'/'10'
-    # process_folder(root, all_records)
+    root = Path(download_dir)/'2023'/'01'
+    process_folder(root, all_records)
     print("江苏南京海关类章所有文件处理完成!")

+ 8 - 0
crossborder/quanguo/parse_month_excel.py

@@ -9,6 +9,8 @@ from crossborder.utils.parse_utils import convert_unit, parse_ratio
 
 CURRENT_YEAR = str(datetime.now().year)
 
+_has_executed = False
+
 def is_current_year_data(date_str):
     """
     判断是否为当前年份的数据(如 2025.01)
@@ -69,7 +71,13 @@ def parse_month_table_excel(file_path):
     解析月度汇总表并入库
     :param file_path: Excel 文件路径
     """
+    global _has_executed
+    if _has_executed:
+        return
+    _has_executed = True
+
     log.info(f"月度表数据解析:{file_path}")
+
     db = DBHelper()
     sql_template = """
     INSERT INTO t_yujin_crossborder_monthly_summary 

+ 5 - 4
crossborder/quanguo/parse_year_excel.py

@@ -1,6 +1,7 @@
 from datetime import datetime
 import xlrd
 from crossborder.utils.db_helper import DBHelper
+from crossborder.utils.log import log
 from crossborder.utils.parse_utils import convert_unit, parse_ratio
 
 _parse_executed = False  # 模块级变量,控制执行次数
@@ -29,7 +30,7 @@ def get_upsert_sql():
 def parse_year_table_excel(file):
     global _parse_executed
     if _parse_executed:
-        print("⚠️ parse_year_table_excel 已执行过,不再重复执行")
+        log.info("⚠️ parse_year_table_excel 已执行过,不再重复执行")
         return
 
     db_helper = DBHelper()
@@ -40,7 +41,7 @@ def parse_year_table_excel(file):
         workbook = xlrd.open_workbook(file)
         sheet = workbook.sheet_by_index(0)
     except Exception as e:
-        print(f"文件读取失败: {e}")
+        log.error(f"文件读取失败: {e}")
         return
 
     sql = get_upsert_sql()
@@ -68,10 +69,10 @@ def parse_year_table_excel(file):
     # 使用 DBHelper 执行 SQL 插入
     try:
         affected_rows = db_helper.execute_sql_with_params(sql, params_list)
-        print(f"成功处理 {len(params_list)} 条数据,受影响行数:{affected_rows}")
+        log.info(f"成功处理 {len(params_list)} 条数据,受影响行数:{affected_rows}")
         _parse_executed = True
     except Exception as e:
-        print(f"数据库操作失败: {e}")
+        log.error(f"数据库操作失败: {e}")
         raise
 
 

+ 18 - 11
crossborder/quanguo/selenium_download.py

@@ -18,7 +18,7 @@ from crossborder.utils.log import log
 
 base_url = "http://www.customs.gov.cn/customs/302249/zfxxgk/2799825/302274/302277/6348926/index.html"
 download_dir = DOWNLOAD_DIR / "total"
-
+downloaded_tables = set()  # 已下载的表格名集合
 
 
 
@@ -113,14 +113,13 @@ def go_to_year_page(driver, year):
         return False
 
 
-def crawl_with_selenium(driver, base_url, year, latest_only=False):
+def crawl_with_selenium(driver, year, latest_only=False):
     """主抓取函数"""
-    driver.get(base_url)
-
-    if not go_to_year_page(driver, year):
-        log.warning(f"{year} 页面不可用,跳过")
-        return
-    log.info(f"开始抓取 {year} 年数据:{driver.current_url}")
+    if year < datetime.now().year:
+        if not go_to_year_page(driver, year):
+            log.warning(f"{year} 页面不可用,跳过")
+            return
+    log.info(f"开始抓取 {year} 年数据,当前标题: {driver.title}")
     try:
         while True:
             table = WebDriverWait(driver, 20).until(
@@ -146,7 +145,7 @@ def crawl_with_selenium(driver, base_url, year, latest_only=False):
             time.sleep(random.uniform(1, 3))
 
     except StaleElementReferenceException:
-        log.info("检测到元素失效,自动刷新表格")
+        log.error("检测到元素失效,自动刷新表格")
         driver.refresh()
         WebDriverWait(driver, 30).until(
             EC.presence_of_element_located((By.CSS_SELECTOR, f"#yb{year}RMB"))
@@ -158,9 +157,14 @@ def sanitize_filename(filename):
 
 
 def handle_month_data(driver, table_name, month_links, year, latest_only):
+    global downloaded_tables
     main_window = driver.current_window_handle
     for idx, month_data in enumerate(month_links):
         if 1 <= month_data[0] <= 12:
+            # 年度表月度表只下载一次(最新月份数据)
+            if "进出口商品总值表" in table_name and table_name in downloaded_tables:
+                log.info(f"【{table_name}】已下载过,跳过")
+                continue
             # 新标签页策略(防止主页面DOM变更)
             driver.switch_to.window(main_window)
             driver.execute_script(f"window.open('{month_data[1]}', '_blank_{idx}')")
@@ -169,6 +173,8 @@ def handle_month_data(driver, table_name, month_links, year, latest_only):
             month_num, link = month_data
             try:
                download_excel(driver, link, year, month_num, table_name, download_dir)
+               # 下载成功后将表格名加入集合
+               downloaded_tables.add(table_name)
             except Exception as e:
                log.info(f"【异常】下载失败: {str(e)}")
             time.sleep(random.uniform(0.5, 1.5))  # 下载间隔
@@ -190,11 +196,12 @@ if __name__ == "__main__":
     options = configure_stealth_options(download_dir)
     driver = webdriver.Firefox(options=options)
 
+    base_url = "http://www.customs.gov.cn/customs/302249/zfxxgk/2799825/302274/302277/6348926/index.html"
+    driver.get(base_url)
     try:
         for year in years_to_crawl:
-            base_url = "http://www.customs.gov.cn/customs/302249/zfxxgk/2799825/302274/302277/6348926/index.html"
             log.info(f"\n【{year}年】开始抓取...".center(66, "-"))
-            crawl_with_selenium(driver, base_url, year=year, latest_only=args.year is None)
+            crawl_with_selenium(driver, year=year, latest_only=args.year is None)
     finally:
         driver.quit()
         log.info("【海关总署】全年数据抓取结束".center(66, "*"))

+ 30 - 126
crossborder/utils/base_mysql.py

@@ -1,10 +1,10 @@
 from urllib.parse import quote_plus
 
 import pymysql
-from sqlalchemy import create_engine, text
+from sqlalchemy import text, create_engine
 
-from crossborder.utils.log import log
 from crossborder.utils.crypto_utils import AESCryptor
+from crossborder.utils.log import log
 
 provinces = [
     "北京市", "天津市", "上海市", "重庆市",
@@ -42,6 +42,26 @@ def get_decrypted_password():
             raise
     return encrypted_pass
 
+
+# 在 base_mysql.py 模块加载时自动完成解密
+def initialize_engine():
+    """初始化数据库引擎(包含密码解密)"""
+    db_config = DB_CONFIG.copy()
+    db_config['password'] = get_decrypted_password()
+
+    # 对密码进行 URL 编码
+    encoded_password = quote_plus(db_config["password"])
+
+    # 构建 SQLAlchemy 引擎
+    return create_engine(
+        f"mysql+pymysql://{db_config['user']}:{encoded_password}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset={db_config['charset']}",
+        pool_size=5,
+        max_overflow=10
+    )
+
+# 全局引擎实例
+engine = initialize_engine()
+
 def get_commodity_id(commodity_name):
     """根据商品名称查询数据库,获取商品 ID 和商品名称"""
     fix_commodity_name = commodity_name
@@ -143,83 +163,16 @@ def bulk_insert(sql_statements):
         log.info("未提供有效的 SQL 插入语句,跳过操作")
         return
 
-    connection = None
     try:
-        # 使用解密后的密码创建连接
-        db_config = DB_CONFIG.copy()
-        db_config['password'] = get_decrypted_password()
-
-        # 创建连接并开启事务
-        connection = pymysql.connect(**db_config)
-        connection.begin()  # 显式开始事务
-
-        with connection.cursor() as cursor:
-            # 遍历执行所有 SQL 语句
-            for sql in sql_statements:
-                # 移除 SQL 两端空白并执行
-                cursor.execute(sql.strip())
-
-            # 提交事务
-            connection.commit()
-            log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
-
+        with engine.connect() as conn:
+            with conn.begin():
+                for sql in sql_statements:
+                    stmt = text(sql.strip())
+                    conn.execute(stmt)
+                log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
     except Exception as e:
-        # 回滚事务并记录错误
-        if connection:
-            connection.rollback()
         log.info(f"数据库操作失败: {str(e)}")
         raise
-    finally:
-        # 确保连接关闭
-        if connection:
-            connection.close()
-
-def update_january_yoy(prov_name):
-    """
-    更新指定省份1月份同比数据
-    :param prov_name: 省份名称,默认为福建省
-    """
-    update_sql = text("""
-                      UPDATE t_yujin_crossborder_prov_region_trade AS curr
-                          INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
-                      ON curr.city_code = prev.city_code
-                          AND prev.crossborder_year_month = DATE_FORMAT(
-                          DATE_SUB(
-                          STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
-                          INTERVAL 1 YEAR
-                          ),
-                          '%Y-01'
-                          )
-                          SET
-                              curr.yoy_import_export = COALESCE (
-                              ROUND(
-                              (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
-                              ), 0.0000
-                              ), curr.yoy_import = COALESCE (
-                              ROUND(
-                              (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
-                              ), 0.0000
-                              ), curr.yoy_export = COALESCE (
-                              ROUND(
-                              (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
-                              ), 0.0000
-                              )
-                      WHERE
-                          curr.prov_name = :prov_name
-                        AND curr.crossborder_year_month LIKE '%-01'
-                        AND curr.crossborder_year_month
-                          > '2023-01'
-                      """)
-
-    try:
-        with engine.begin() as conn:
-            result = conn.execute(update_sql, {'prov_name': prov_name})
-            log.info(f"Updated {result.rowcount} rows for {prov_name}")
-            return result.rowcount
-
-    except Exception as e:
-        log.info(f"Update failed: {str(e)}")
-        raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
 
 def clear_old_shandong_yoy(prov_name):
     """
@@ -301,53 +254,6 @@ def _update_shandong_new_yoy(prov_name):
         log.info(f"{prov_name} 新数据更新数: {result.rowcount}")
         return result.rowcount
 
-def update_january_yoy_origin(region_name):
-    """
-    更新指定省份1月份同比数据
-    :param region_name: 省份名称,默认为福建省
-    """
-    update_sql = text("""
-                      UPDATE t_yujin_crossborder_region_trade AS curr
-                          INNER JOIN t_yujin_crossborder_region_trade AS prev
-                      ON curr.region_code = prev.region_code
-                          AND prev.year_month = DATE_FORMAT(
-                          DATE_SUB(
-                          STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'),
-                          INTERVAL 1 YEAR
-                          ),
-                          '%Y-01'
-                          )
-                          SET
-                              curr.ytd_total = COALESCE (
-                              ROUND(
-                              (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
-                              ), 0.0000
-                              ), curr.ytd_import = COALESCE (
-                              ROUND(
-                              (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
-                              ), 0.0000
-                              ), curr.ytd_export = COALESCE (
-                              ROUND(
-                              (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
-                              ), 0.0000
-                              )
-                      WHERE
-                          curr.region_name = :region_name
-                        AND curr.year_month LIKE '%-01'
-                        AND curr.year_month
-                          > '2023-01'
-                      """)
-
-    try:
-        with engine.begin() as conn:
-            result = conn.execute(update_sql, {'region_name': region_name})
-            log.info(f"Updated {result.rowcount} rows for {region_name}")
-            return result.rowcount
-
-    except Exception as e:
-        log.info(f"Update failed: {str(e)}")
-        raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
-
 def clear_old_shandong_yoy_origin(region_name):
     """
     清理山东省2024年前数据的同比指标
@@ -436,12 +342,10 @@ if __name__ == '__main__':
     # print(count)
 
     # 新表更新地级市同比
-    # for province in provinces:
-    #     update_january_yoy(province)
-    #     update_shandong_yoy(province)
+    for province in provinces:
+        update_shandong_yoy(province)
 
     # 旧表更新省份同比
     # for province in provinces:
-    #     update_january_yoy_origin(province)
     #     update_shandong_yoy_origin(province)
     log.info("同比sql处理完成")

+ 0 - 1
crossborder/zhejiang/crawl_gov_zhejiang_full.py

@@ -317,7 +317,6 @@ def crawl_with_selenium(url, mark):
         hierarchical_traversal(download_dir)
         log.info("浙江杭州海关类章、国家、城市所有文件处理完成!")
         time.sleep(5)
-        base_mysql.update_january_yoy('浙江省')
         base_mysql.update_shandong_yoy('浙江省')
         log.info("浙江杭州海关城市同比sql处理完成")
 

+ 0 - 1
crossborder/zhejiang/gov_commodity_zhejiang_city.py

@@ -159,7 +159,6 @@ if __name__ == '__main__':
     hierarchical_traversal(download_dir)
     log.info(f"浙江杭州海关城市所有文件处理完成!")
     time.sleep(5)
-    base_mysql.update_january_yoy('浙江省')
     base_mysql.update_shandong_yoy('浙江省')
     log.info("同比sql处理完成")
     # root = Path(download_dir)/'2024'/'07'