Kaynağa Gözat

crawl data fix

zhangfan 1 ay önce
ebeveyn
işleme
70063e265b

+ 8 - 8
crossborder/anhui/gov_commodity_anhui_import_export.py

@@ -110,19 +110,20 @@ def save_to_database(import_df, export_df, year, month):
             if not commodity_name_fix or commodity_name_fix in processed_commodities:
                 continue
 
+            # nan不转为0而是作为null存储
             monthly_import = round(row['import'] * 10000, 4)
             monthly_export = round(row['export'] * 10000, 4)
-            monthly_total = round(
-                (0 if pd.isna(monthly_import) else monthly_import) +
-                (0 if pd.isna(monthly_export) else monthly_export),
-                4
-            )
+            monthly_import_handle = monthly_import if pd.notna(row['import']) else 0.0
+            monthly_export_handle = monthly_export if pd.notna(row['export']) else 0.0
+            monthly_total = round(monthly_import_handle + monthly_export_handle, 4)
 
             if month == 2:
                 year_month_2 = f'{year}-01'
                 monthly_import = round(monthly_import / 2, 4)
                 monthly_export = round(monthly_export / 2, 4)
-                monthly_total = round(monthly_import + monthly_export, 4)
+                monthly_import_handle = monthly_import if pd.notna(row['import']) else 0.0
+                monthly_export_handle = monthly_export if pd.notna(row['export']) else 0.0
+                monthly_total = round(monthly_import_handle + monthly_export_handle, 4)
                 sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                        f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
                        f"('{year}', '{year_month_2}', '340000', '安徽省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
@@ -136,7 +137,6 @@ def save_to_database(import_df, export_df, year, month):
             sql_arr.append(sql)
 
             processed_commodities.add(commodity_name_fix)
-            # log.info(f'{commodity_name} -> {commodity_name_fix}')
 
     except Exception as e:
         log.info(f"{year_month} prov_commodity_trade 生成 SQL 文件时发生异常: {str(e)}")
@@ -180,6 +180,6 @@ def hierarchical_traversal(root_path):
 if __name__ == '__main__':
     hierarchical_traversal(download_dir)
 
-    # root = Path(base_country_code.download_dir)/'2025'/'04'
+    # root = Path(download_dir)/'2023'/'02'
     # process_folder(root)
     log.info("安徽省海关类章所有文件处理完成!")

+ 13 - 10
crossborder/hebei/gov_commodity_hebei_import_export.py

@@ -47,9 +47,7 @@ def clean_commodity_name(name, preserve_keywords=None):
 def process_folder(path):
     year, month = base_country_code.extract_year_month_from_path(path)
 
-    name_index = 1 if year == 2025 and month >= 3 else 0
-    value_index = 5 if year == 2025 and month >= 3 else 4
-    res = df_data(path, name_index, value_index)
+    res = df_data(path, year, month)
     if not res:
         log.info(f"{path} 上月目录里文件未找到包含 主出、主进商品 sheet")
         return
@@ -81,16 +79,16 @@ def save_to_database(merged_df, year, month):
 
             monthly_import = round(row['import'], 4)
             monthly_export = round(row['export'], 4)
-            monthly_total = round(
-                (0 if pd.isna(monthly_import) else monthly_import) +
-                (0 if pd.isna(monthly_export) else monthly_export),
-                4
-            )
+            monthly_import_handle = monthly_import if pd.notna(row['import']) else 0.0
+            monthly_export_handle = monthly_export if pd.notna(row['export']) else 0.0
+            monthly_total = round(monthly_import_handle + monthly_export_handle, 4)
 
             if year_month == '2023-02':
                 monthly_import = round(monthly_import / 2, 4)
                 monthly_export = round(monthly_export / 2, 4)
-                monthly_total = round(monthly_import + monthly_export, 4)
+                monthly_import_handle = monthly_import if pd.notna(row['import']) else 0.0
+                monthly_export_handle = monthly_export if pd.notna(row['export']) else 0.0
+                monthly_total = round(monthly_import_handle + monthly_export_handle, 4)
                 sql = (f"INSERT INTO t_yujin_crossborder_prov_commodity_trade "
                        f"(crossborder_year, crossborder_year_month, prov_code, prov_name, commodity_code, commodity_name, monthly_total, monthly_export, monthly_import, create_time) VALUES "
                        f"('2023', '2023-01', '130000', '河北省', '{commodity_code}', '{commodity_name_fix}', {format_sql_value(monthly_total)}, {format_sql_value(monthly_export)}, {format_sql_value(monthly_import)}, now())"
@@ -117,7 +115,12 @@ def save_to_database(merged_df, year, month):
     log.info(f"√ {year_month} prov_commodity_trade SQL 存表完成!")
 
 
-def df_data(path, name_index, value_index):
+def df_data(path, year, month):
+    name_index = 1 if year == 2025 and month >= 3 else 0
+    value_index = 5 if year == 2025 and month >= 3 else 4
+    if year == 2023 and (month == 2 or month == 3) :
+        value_index = 1
+
     file_paths = list(Path(path).glob('*'))
     if not file_paths:
         log.info("未找到任何文件")

+ 16 - 25
crossborder/zhejiang/gov_commodity_zhejiang_country.py

@@ -5,7 +5,7 @@ import pandas as pd
 from crossborder.zhejiang import download_dir
 from crossborder.utils import base_country_code, base_mysql
 from crossborder.utils.base_country_code import format_sql_value
-from crossborder.utils.log import get_logger
+from crossborder.utils.log import  get_logger
 
 log = get_logger(__name__)
 
@@ -88,33 +88,23 @@ def process_folder(path):
         prev_export = prev_export_df.groupby('commodity')['export'].sum().reset_index()
         prev_total_df = prev_total_df.groupby('commodity')['total'].sum().reset_index()
 
-        # 新增字段标准化逻辑
-        curr_import['commodity'] = curr_import['commodity'].str.strip().str.split('(|\\(').str[0]
-        prev_import['commodity'] = prev_import['commodity'].str.strip().str.split('(|\\(').str[0]
-        curr_export['commodity'] = curr_export['commodity'].str.strip().str.split('(|\\(').str[0]
-        prev_export['commodity'] = prev_export['commodity'].str.strip().str.split('(|\\(').str[0]
-        total_df['commodity'] = total_df['commodity'].str.strip().str.split('(|\\(').str[0]
-        prev_total_df['commodity'] = prev_total_df['commodity'].str.strip().str.split('(|\\(').str[0]
+        # 差值计算
+        curr_import = pd.merge(curr_import, prev_import, on='commodity', how='left')
+        curr_import['import'] = round(curr_import['import_x'] - curr_import['import_y'], 4)
 
-        # 差值计算优化 - 开始
-        curr_import = pd.merge(curr_import, prev_import, on='commodity', how='left').fillna(0)
-        curr_import['import'] = (curr_import['import_x'] - curr_import['import_y']).round(4)
-
-        curr_export = pd.merge(curr_export, prev_export, on='commodity', how='left').fillna(0)
-        curr_export['export'] = (curr_export['export_x'] - curr_export['export_y']).round(4)
-
-        total_df = pd.merge(total_df, prev_total_df, on='commodity', how='left').fillna(0)
-        total_df['total'] = (total_df['total_x'] - total_df['total_y']).round(4)
-        # 差值计算优化 - 结束
+        curr_export = pd.merge(curr_export, prev_export, on='commodity', how='left')
+        curr_export['export'] = round(curr_export['export_x'] - curr_export['export_y'], 4)
 
+        total_df = pd.merge(total_df, prev_total_df, on='commodity', how='left')
+        total_df['total'] = round(total_df['total_x'] - total_df['total_y'], 4)
         log.info(f"合并文件: {path}*********{previous_month_dir}")
 
-    # 合并进出口数据优化 - 开始
-    merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer').fillna(0)
-    merged_df = pd.merge(merged_df, total_df, on='commodity', how='outer').fillna(0)
-    # 合并进出口数据优化 - 结束
+    # 合并进出口数据
+    merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer')
+    merged_df = pd.merge(merged_df, total_df, on='commodity', how='outer')
 
     sql_arr = []
+    # try:
     for _, row in merged_df.iterrows():
         country_name = str(row['commodity']).strip()
         if country_name.endswith(")") or country_name.endswith(")"):
@@ -148,6 +138,8 @@ def process_folder(path):
             f"'{yoy_export}', NOW()) ON DUPLICATE KEY UPDATE create_time = now();"
         )
         sql_arr.append(sql)
+    # except Exception as e:
+    #     log.info(f"{year_month} 处理时发生异常: {str(e)}")
 
     log.info(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}")
     # 批量插入数据库
@@ -178,7 +170,6 @@ def hierarchical_traversal(root_path):
 if __name__ == '__main__':
     hierarchical_traversal(download_dir)
 
-    # root = Path(download_dir)/'2024'/'10'
+    # root = Path(download_dir) / '2024' / '07'
     # process_folder(root)
-
-    log.info("浙江省海关国别所有文件处理完成!")
+    log.info("浙江省海关国别所有文件处理完成!")