Просмотр исходного кода

Merge remote-tracking branch 'origin/master'

01495251 1 месяц назад
Родитель
Сommit
41946141d1

+ 31 - 17
crossborder/anhui/crawl_gov_anhui_full.py

@@ -17,6 +17,8 @@ from crossborder.anhui import gov_commodity_anhui_city, download_dir
 from crossborder.anhui import gov_commodity_anhui_country
 from crossborder.anhui import gov_commodity_anhui_import_export
 from crossborder.utils import base_country_code, base_mysql
+from crossborder.utils.base_country_code import get_last_month
+from crossborder.utils.base_mysql import get_commodity_trade_by_prov_year_month
 from crossborder.utils.dingtalk import send_dingtalk_message
 from crossborder.utils.log import  get_logger
 
@@ -236,7 +238,7 @@ def crawl_with_selenium(url, mark):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                log.info("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止采集")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -247,7 +249,7 @@ def crawl_with_selenium(url, mark):
             # 访问下一页
             driver.get(next_page_url)
 
-            log.info(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始采集 {next_page_url} 页面数据")
 
     finally:
         driver.quit()
@@ -322,22 +324,34 @@ def hierarchical_traversal(root_path):
                 gov_commodity_anhui_city.process_folder(md['path'])
 
 def main():
-    parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
-    parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
-    args = parser.parse_args()
-
-    start_time = time.time()
-    if args.year == 2023:
-        log.info("正在全量爬取安徽省海关数据")
-        crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html','all')
-        duration = time.time() - start_time
-        send_dingtalk_message(f'安徽省海关全量数据爬取完成,耗时 {duration:.2f} 秒')
-    else:
-        log.info("正在增量爬取安徽省海关数据")
-        res = crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html','auto')
-        if res == 'finish':
+    try:
+        parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
+        parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
+        args = parser.parse_args()
+
+        start_time = time.time()
+        if args.year == 2023:
+            log.info("正在全量采集安徽省海关数据")
+            crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html','all')
             duration = time.time() - start_time
-            send_dingtalk_message(f'安徽省海关增量数据爬取完成,耗时 {duration:.2f} 秒')
+            minutes, seconds = divmod(duration, 60)
+            send_dingtalk_message(f'【安徽省海关】全量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
+        else:
+            log.info("正在增量采集安徽省海关数据")
+            res = crawl_with_selenium('http://hefei.customs.gov.cn/hefei_customs/zfxxgkzl59/3169584/479584/479585/index.html','auto')
+            if res == 'finish':
+                duration = time.time() - start_time
+                minutes, seconds = divmod(duration, 60)
+                send_dingtalk_message(f'【安徽省海关】增量数据采集完成,{int(minutes)}分{seconds:.1f}秒')
+
+                res = get_commodity_trade_by_prov_year_month('安徽省', get_last_month())
+                if res is not None:
+                    send_dingtalk_message(f"【安徽省海关】 commodity_trade 查询到 {len(res)} 条记录,文件已生成")
+                else:
+                    send_dingtalk_message("【安徽省海关】 未查询到任何记录或发生错误")
+
+    except Exception as e:
+        send_dingtalk_message(f'【安徽省海关】发生错误:{e}')
 
 if __name__ == '__main__':
     main()

+ 31 - 17
crossborder/hebei/crawl_gov_hebei_full.py

@@ -18,6 +18,8 @@ from crossborder.hebei import gov_commodity_hebei_city
 from crossborder.hebei import gov_commodity_hebei_country
 from crossborder.hebei import gov_commodity_hebei_import_export
 from crossborder.utils import base_country_code, base_mysql
+from crossborder.utils.base_country_code import get_last_month
+from crossborder.utils.base_mysql import get_commodity_trade_by_prov_year_month
 from crossborder.utils.dingtalk import send_dingtalk_message
 from crossborder.utils.log import  get_logger
 
@@ -215,7 +217,7 @@ def crawl_with_selenium(url, mark):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                log.info("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止采集")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -226,7 +228,7 @@ def crawl_with_selenium(url, mark):
             # 访问下一页
             driver.get(next_page_url)
 
-            log.info(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始采集 {next_page_url} 页面数据")
     finally:
         driver.quit()
         # 等待5s后执行
@@ -300,22 +302,34 @@ def hierarchical_traversal(root_path):
 
 
 def main():
-    parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
-    parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
-    args = parser.parse_args()
-
-    start_time = time.time()
-    if args.year == 2023:
-        log.info("正在全量爬取河北省海关数据")
-        crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html', 'all')
-        duration = time.time() - start_time
-        send_dingtalk_message(f'河北省海关全量数据爬取完成,耗时 {duration:.2f} 秒')
-    else:
-        log.info("正在增量爬取河北省海关数据")
-        res = crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html','auto')
-        if res == 'finish':
+    try:
+        parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
+        parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
+        args = parser.parse_args()
+
+        start_time = time.time()
+        if args.year == 2023:
+            log.info("正在全量采集河北省海关数据")
+            crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html', 'all')
             duration = time.time() - start_time
-            send_dingtalk_message(f'河北省海关增量数据爬取完成,耗时 {duration:.2f} 秒')
+            minutes, seconds = divmod(duration, 60)
+            send_dingtalk_message(f'【河北省海关】全量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
+        else:
+            log.info("正在增量采集河北省海关数据")
+            res = crawl_with_selenium('http://shijiazhuang.customs.gov.cn/shijiazhuang_customs/zfxxgk43/2988665/2988681/index.html','auto')
+            if res == 'finish':
+                duration = time.time() - start_time
+                minutes, seconds = divmod(duration, 60)
+                send_dingtalk_message(f'【河北省海关】增量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
+
+                res = get_commodity_trade_by_prov_year_month('河北省', get_last_month())
+                if res is not None:
+                    send_dingtalk_message(f"【河北省海关】 commodity_trade 查询到 {len(res)} 条记录,文件已生成")
+                else:
+                    send_dingtalk_message("【河北省海关】 未查询到任何记录或发生错误")
+
+    except Exception as e:
+        send_dingtalk_message(f"【河北省海关】发生错误:{e}")
 
 if __name__ == '__main__':
     main()

+ 30 - 17
crossborder/jiangsu/crawl_gov_jiangsu_full.py

@@ -23,6 +23,8 @@ from crossborder.jiangsu import gov_commodity_jiangsu_city
 from crossborder.jiangsu import gov_commodity_jiangsu_import_export
 
 from crossborder.utils import base_country_code, base_mysql
+from crossborder.utils.base_country_code import get_last_month
+from crossborder.utils.base_mysql import get_commodity_trade_by_prov_year_month
 from crossborder.utils.dingtalk import send_dingtalk_message
 from crossborder.utils.log import  get_logger
 
@@ -256,7 +258,7 @@ def crawl_with_selenium(url, mark):
             # 获取下一页的URL
             next_page_url = next_page_btn.get_attribute("onclick")
             if not next_page_url:
-                log.info("已到达最后一页,停止爬取")
+                log.info("已到达最后一页,停止采集")
                 break
             # 从onclick属性中提取URL
             next_page_url = re.search(r"'(.*?)'", next_page_url).group(1)
@@ -267,7 +269,7 @@ def crawl_with_selenium(url, mark):
             # 访问下一页
             driver.get(next_page_url)
 
-            log.info(f"开始爬取 {next_page_url} 页面数据")
+            log.info(f"开始采集 {next_page_url} 页面数据")
 
     finally:
         driver.quit()
@@ -337,22 +339,33 @@ def hierarchical_traversal(root_path, all_records):
                 gov_commodity_jiangsu_city.process_folder(md['path'])
 
 def main():
-    parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
-    parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
-    args = parser.parse_args()
-
-    start_time = time.time()
-    if args.year == 2023:
-        log.info("正在全量爬取江苏省海关数据")
-        crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html','all')
-        duration = time.time() - start_time
-        send_dingtalk_message(f'江苏省海关全量数据爬取完成,耗时 {duration:.2f} 秒')
-    else:
-        log.info("正在增量爬取江苏省海关数据")
-        res = crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html','auto')
-        if res == 'finish':
+    try:
+        parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
+        parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
+        args = parser.parse_args()
+
+        start_time = time.time()
+        if args.year == 2023:
+            log.info("正在全量采集江苏省海关数据")
+            crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html','all')
             duration = time.time() - start_time
-            send_dingtalk_message(f'江苏省海关增量数据爬取完成,耗时 {duration:.2f} 秒')
+            minutes, seconds = divmod(duration, 60)
+            send_dingtalk_message(f'【江苏省海关】全量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
+        else:
+            log.info("正在增量采集江苏省海关数据")
+            res = crawl_with_selenium('http://nanjing.customs.gov.cn/nanjing_customs/zfxxgk58/fdzdgknr95/3010051/589289/7e2fcc72-1.html','auto')
+            if res == 'finish':
+                duration = time.time() - start_time
+                minutes, seconds = divmod(duration, 60)
+                send_dingtalk_message(f'【江苏省海关】增量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
+
+                res = get_commodity_trade_by_prov_year_month('江苏省', get_last_month())
+                if res is not None:
+                    send_dingtalk_message(f"【江苏省海关】 commodity_trade 查询到 {len(res)} 条记录,文件已生成")
+                else:
+                    send_dingtalk_message("【江苏省海关】 未查询到任何记录或发生错误")
+    except Exception as e:
+        send_dingtalk_message(f"【江苏省海关】发生错误:{e}")
 
 if __name__ == '__main__':
     main()

+ 18 - 5
crossborder/utils/base_country_code.py

@@ -1,10 +1,8 @@
-import os
 import re
-from pathlib import Path
-
+from datetime import datetime
 import pandas as pd
 
-from crossborder.utils.log import  get_logger
+from crossborder.utils.log import get_logger
 
 log = get_logger(__name__)
 
@@ -330,4 +328,19 @@ def extract_year_month_from_path(path):
             raise ValueError(f"无效月份格式:{month_part}")
         return int(year_part), int(month_part)
     except IndexError:
-        raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04")
+        raise ValueError("路径结构不符合要求,示例:.../shandong/2025/04")
+
+def get_last_month():
+    # 获取当前时间
+    today = datetime.today()
+
+    # 计算上个月
+    if today.month == 1:
+        # 如果是1月,则上个月是去年12月
+        last_month = today.replace(year=today.year - 1, month=12)
+    else:
+        # 否则,直接减去一个月
+        last_month = today.replace(month=today.month - 1)
+
+    # 返回格式为 yyyy-MM 的字符串
+    return last_month.strftime('%Y-%m')

+ 61 - 5
crossborder/utils/base_mysql.py

@@ -336,18 +336,74 @@ def _update_shandong_new_yoy_origin(region_name):
         log.info(f"{region_name} 新数据更新数: {result.rowcount}")
         return result.rowcount
 
+def generate_sql_file(results, prov_name, year_month):
+    filename = f"export_{prov_name}_{year_month}.sql"
+
+    with open(filename, 'w', encoding='utf-8') as f:
+        for row in results:
+            # 过滤掉不需要的字段(如 id)
+            filtered_row = {k: v for k, v in row.items() if k != 'id'}
+
+            # 构建INSERT语句
+            columns = ', '.join([f"`{k}`" for k in filtered_row.keys()])
+            values = ', '.join([
+                f"'{v}'" if isinstance(v, (str,)) else str(v) if v is not None else 'NULL'
+                for v in filtered_row.values()
+            ])
+
+            sql = f"INSERT INTO `t_yujin_crossborder_prov_commodity_trade` ({columns}) VALUES ({values});\n"
+            f.write(sql)
+
+    log.info(f"成功生成SQL文件:{filename}")
+
+def get_commodity_trade_by_prov_year_month(prov_name, year_month):
+    connection = None
+    try:
+        # 连接数据库
+        db_config = DB_CONFIG.copy()
+        db_config['password'] = get_decrypted_password()
+        connection = pymysql.connect(**db_config)
+
+        # 使用 DictCursor 来获取字典形式的结果
+        with connection.cursor(pymysql.cursors.DictCursor) as cursor:
+            # 执行查询
+            sql = "SELECT * FROM t_yujin_crossborder_prov_commodity_trade WHERE prov_name = %s AND crossborder_year_month = %s"
+            cursor.execute(sql, (prov_name, year_month))
+
+            # 获取结果
+            results = cursor.fetchall()
+
+            # 生成SQL文件
+            if results:
+                generate_sql_file(results, prov_name, year_month)
+
+            return results
+
+    except Exception as e:
+        log.error(f"get_commodity_trade_by_prov_year_month error: {str(e)}")
+        return None
+    finally:
+        if connection:
+            connection.close()
+
 if __name__ == '__main__':
-    commodity_code, commodity_name_fix = get_commodity_id('农产品')
-    print(commodity_code, commodity_name_fix)
+    res = get_commodity_trade_by_prov_year_month('江苏省','2025-04')
+    if res is not None:
+        print(f"查询到 {len(res)} 条记录")
+    else:
+        print("未查询到任何记录或发生错误")
+
+    # commodity_code, commodity_name_fix = get_commodity_id('农产品')
+    # print(commodity_code, commodity_name_fix)
     # check_year, check_month = 2024, 4
     # count = get_code_exist(f'{check_year}-{check_month:02d}', "340000")
     # print(count)
 
     # 新表更新地级市同比
-    for province in provinces:
-        update_shandong_yoy(province)
+    # for province in provinces:
+    #     update_shandong_yoy(province)
 
     # 旧表更新省份同比
     # for province in provinces:
     #     update_shandong_yoy_origin(province)
-    log.info("同比sql处理完成")
+    # log.info("同比sql处理完成")

+ 29 - 16
crossborder/zhejiang/crawl_gov_zhejiang_full.py

@@ -16,6 +16,8 @@ from selenium.webdriver.common.by import By
 from selenium.webdriver.support import expected_conditions as EC
 from selenium.webdriver.support.ui import WebDriverWait
 
+from crossborder.utils.base_country_code import get_last_month
+from crossborder.utils.base_mysql import get_commodity_trade_by_prov_year_month
 from crossborder.utils.dingtalk import send_dingtalk_message
 from crossborder.zhejiang import download_dir
 from crossborder.zhejiang import gov_commodity_zhejiang_city
@@ -56,7 +58,7 @@ def configure_stealth_options():
     return opts
 
 def crawl_by_year_tabs(driver, base_url, year_month):
-    """按年份Tab导航爬取数据"""
+    """按年份Tab导航采集数据"""
     years = ['2023年', '2024年', '2025年']
     WebDriverWait(driver, 30).until(
         EC.presence_of_element_located((By.CLASS_NAME, "portlet"))
@@ -384,22 +386,33 @@ def hierarchical_traversal(root_path):
                 gov_commodity_zhejiang_city.process_folder(md['path'])
 
 def main():
-    parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
-    parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
-    args = parser.parse_args()
-
-    start_time = time.time()
-    if args.year == 2023:
-        log.info("正在全量爬取浙江省海关数据")
-        crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html', 'all')
-        duration = time.time() - start_time
-        send_dingtalk_message(f'浙江省海关全量数据爬取完成,耗时 {duration:.2f} 秒')
-    else:
-        log.info("正在增量爬取浙江省海关数据")
-        res = crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html','auto')
-        if res == 'finish':
+    try:
+        parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
+        parser.add_argument('--year', type=int, default=None, help='终止年份(如2023),未指定时抓取最新两个月')
+        args = parser.parse_args()
+
+        start_time = time.time()
+        if args.year == 2023:
+            log.info("正在全量采集浙江省海关数据")
+            crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html', 'all')
             duration = time.time() - start_time
-            send_dingtalk_message(f'浙江省海关增量数据爬取完成,耗时 {duration:.2f} 秒')
+            minutes, seconds = divmod(duration, 60)
+            send_dingtalk_message(f'【浙江省海关】全量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
+        else:
+            log.info("正在增量采集浙江省海关数据")
+            res = crawl_with_selenium('http://hangzhou.customs.gov.cn/hangzhou_customs/575609/zlbd/575612/575612/6430241/6430315/index.html','auto')
+            if res == 'finish':
+                duration = time.time() - start_time
+                minutes, seconds = divmod(duration, 60)
+                send_dingtalk_message(f'【浙江省海关】增量数据采集完成,耗时 {int(minutes)}分{seconds:.1f}秒')
+
+                res = get_commodity_trade_by_prov_year_month('浙江省', get_last_month())
+                if res is not None:
+                    send_dingtalk_message(f"【浙江省海关】 commodity_trade 查询到 {len(res)} 条记录,文件已生成")
+                else:
+                    send_dingtalk_message("【浙江省海关】 未查询到任何记录或发生错误")
+    except Exception as e:
+        send_dingtalk_message(f"【浙江省海关】发生错误:{e}")
 
 if __name__ == '__main__':
     main()