Forráskód Böngészése

拱北海关2月份数据异常修复

01495251 1 hónapja
szülő
commit
4ee11fbf3c

+ 6 - 11
crossborder/guangdong/guangdong_gongbei_parse_excel.py

@@ -196,23 +196,18 @@ def detect_column_mapping(cols, base_mapping, city_name):
             city_index = i
             break
 
-    if city_index is None:
-        # 如果没有找到城市名称,使用基准映射
-        return base_mapping
-
     if city_index == 0:
         # 城市在第0列 - 基准情况
         return base_mapping
     else:
         # 城市在其他列 - 创建偏移映射
-        offset = city_index  # 因为基准映射中city_name在第0列
         return {
-            'monthly_total': base_mapping['monthly_total'] + offset - 1,
-            'yoy_import_export': base_mapping['yoy_import_export'] + offset - 1,
-            'monthly_export': base_mapping['monthly_export'] + offset - 1,
-            'yoy_export': base_mapping['yoy_export'] + offset - 1,
-            'monthly_import': base_mapping['monthly_import'] + offset - 1,
-            'yoy_import': base_mapping['yoy_import'] + offset - 1
+            'monthly_total': base_mapping['monthly_total']  - 1,
+            'yoy_import_export': base_mapping['yoy_import_export']  - 1,
+            'monthly_export': base_mapping['monthly_export'] - 1,
+            'yoy_export': base_mapping['yoy_export']  - 1,
+            'monthly_import': base_mapping['monthly_import'] +  - 1,
+            'yoy_import': base_mapping['yoy_import']  - 1
         }
 
 

+ 10 - 1
crossborder/guangdong/selenium_guangdong_city.py

@@ -16,6 +16,7 @@ from crossborder.utils.db_helper import DBHelper
 from crossborder.guangdong.guangdong_sub_customs_parse_excel import parse_excel
 from crossborder.utils.constants import DOWNLOAD_DIR
 from crossborder.utils.constants import GUANGDONG_CUSTOMS_URL
+from crossborder.utils.dingtalk import send_dingtalk_message
 from crossborder.utils.download_utils import configure_stealth_options, generate_month_sequence, download_excel, download_excel2, \
     batch_download_excel
 from crossborder.utils.log import  get_logger
@@ -262,10 +263,12 @@ def random_sleep(base=2, variance=5):
 
 def main():
     """主入口(优化参数处理逻辑)"""
+    global target_months
     parser = argparse.ArgumentParser(description='海关数据智能抓取系统')
     parser.add_argument('--year', type=int, default=None,
                         help='终止年份(如2023),未指定时抓取最新两个月')
     args = parser.parse_args()
+    start_time = time.time()
     driver = webdriver.Firefox(options=configure_stealth_options(download_dir))
     for customs_name in GUANGDONG_CUSTOMS_URL.keys():
         try:
@@ -304,6 +307,12 @@ def main():
     db_helper = DBHelper()
     db_helper.update_prov_yoy("广东省")
     log.info("\n广东省地级市数据同比更新结束")
+    duration = time.time() - start_time
+    minutes, seconds = divmod(duration, 60)  # 转换为分钟和秒
+    message = f'【广东省-广州海关、深圳海关、拱北海关、汕头海关、江门海关、黄埔海关、湛江海关】{len(target_months)}个月份数据已采集完毕,总耗时:{int(minutes)}分{seconds:.1f}秒'
+    send_dingtalk_message(message)
 
 if __name__ == "__main__":
-    main()
+    main()
+    # db_helper = DBHelper()
+    # db_helper.update_prov_yoy("广东省")

+ 17 - 14
crossborder/utils/db_helper.py

@@ -177,29 +177,32 @@ class DBHelper:
             log.info(f"{prov_name}新数据更新数: {result.rowcount}")
             return result.rowcount
 
-
     def query(self, sql, params=None, return_df=True):
-        """
-        执行带参数的SQL语句(支持批量插入/更新)
-
-        :param sql: 参数化的SQL语句(如含%s、%s等)
-        :param params_list: 参数列表,每个元素是一个tuple或dict(根据SQL风格而定)
-        :return: 受影响行数
-        """
         try:
             with self.engine.connect() as conn:
                 if return_df:
-                    # 使用pandas直接读取为DataFrame
-                    result = pd.read_sql(sql, conn, params=params)
-                    log.info(f"查询成功,返回 {len(result)} 条记录")
-                    return result
+                    # 替代方法:使用 SQLAlchemy 结果代理直接创建 DataFrame
+                    result_proxy = conn.execute(text(sql), params or {})
+
+                    # 更健壮的方式获取列名
+                    columns = [col_desc[0] for col_desc in result_proxy.cursor.description]
+
+                    # 获取所有数据
+                    data = result_proxy.fetchall()
+
+                    # 手动创建 DataFrame
+                    df = pd.DataFrame(data, columns=columns)
+                    log.info(f"查询成功,返回 {len(df)} 条记录")
+                    return df
                 else:
-                    # 返回原始结果
-                    result = conn.execute(sql, params or {}).fetchall()
+                    result = conn.execute(text(sql), params or {}).fetchall()
                     log.info(f"查询成功,返回 {len(result)} 条记录")
                     return result
         except Exception as e:
             log.error(f"查询失败: {str(e)}")
+            # 添加详细信息日志
+            log.error(f"SQL: {sql}")
+            log.error(f"Params: {params}")
             raise