| 
					
				 | 
			
			
				@@ -5,7 +5,7 @@ import pandas as pd 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from crossborder.zhejiang import download_dir 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from crossborder.utils import base_country_code, base_mysql 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 from crossborder.utils.base_country_code import format_sql_value 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-from crossborder.utils.log import  get_logger 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+from crossborder.utils.log import get_logger 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 log = get_logger(__name__) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -88,23 +88,33 @@ def process_folder(path): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         prev_export = prev_export_df.groupby('commodity')['export'].sum().reset_index() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         prev_total_df = prev_total_df.groupby('commodity')['total'].sum().reset_index() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        # 差值计算 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        curr_import = pd.merge(curr_import, prev_import, on='commodity', how='left') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        curr_import['import'] = round(curr_import['import_x'] - curr_import['import_y'], 4) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # 新增字段标准化逻辑 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        curr_import['commodity'] = curr_import['commodity'].str.strip().str.split('(|\\(').str[0] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        prev_import['commodity'] = prev_import['commodity'].str.strip().str.split('(|\\(').str[0] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        curr_export['commodity'] = curr_export['commodity'].str.strip().str.split('(|\\(').str[0] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        prev_export['commodity'] = prev_export['commodity'].str.strip().str.split('(|\\(').str[0] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        total_df['commodity'] = total_df['commodity'].str.strip().str.split('(|\\(').str[0] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        prev_total_df['commodity'] = prev_total_df['commodity'].str.strip().str.split('(|\\(').str[0] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        curr_export = pd.merge(curr_export, prev_export, on='commodity', how='left') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        curr_export['export'] = round(curr_export['export_x'] - curr_export['export_y'], 4) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # 差值计算优化 - 开始 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        curr_import = pd.merge(curr_import, prev_import, on='commodity', how='left').fillna(0) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        curr_import['import'] = (curr_import['import_x'] - curr_import['import_y']).round(4) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        curr_export = pd.merge(curr_export, prev_export, on='commodity', how='left').fillna(0) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        curr_export['export'] = (curr_export['export_x'] - curr_export['export_y']).round(4) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        total_df = pd.merge(total_df, prev_total_df, on='commodity', how='left').fillna(0) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        total_df['total'] = (total_df['total_x'] - total_df['total_y']).round(4) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+        # 差值计算优化 - 结束 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        total_df = pd.merge(total_df, prev_total_df, on='commodity', how='left') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-        total_df['total'] = round(total_df['total_x'] - total_df['total_y'], 4) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         log.info(f"合并文件: {path}*********{previous_month_dir}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    # 合并进出口数据 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    merged_df = pd.merge(merged_df, total_df, on='commodity', how='outer') 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 合并进出口数据优化 - 开始 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    merged_df = pd.merge(curr_import, curr_export, on='commodity', how='outer').fillna(0) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    merged_df = pd.merge(merged_df, total_df, on='commodity', how='outer').fillna(0) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # 合并进出口数据优化 - 结束 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     sql_arr = [] 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    # try: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     for _, row in merged_df.iterrows(): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         country_name = str(row['commodity']).strip() 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         if country_name.endswith(")") or country_name.endswith(")"): 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -138,8 +148,6 @@ def process_folder(path): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				             f"'{yoy_export}', NOW()) ON DUPLICATE KEY UPDATE create_time = now();" 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         ) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				         sql_arr.append(sql) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    # except Exception as e: 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    #     log.info(f"{year_month} 处理时发生异常: {str(e)}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     log.info(f"√ {year_month} 成功生成 SQL 条数: {len(sql_arr)}") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				     # 批量插入数据库 
			 | 
		
	
	
		
			
				| 
					
				 | 
			
			
				@@ -168,8 +176,9 @@ def hierarchical_traversal(root_path): 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				 if __name__ == '__main__': 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    # hierarchical_traversal(download_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    hierarchical_traversal(download_dir) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+ 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # root = Path(download_dir)/'2024'/'10' 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    # process_folder(root) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				  
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    root = Path(download_dir) / '2024' / '07' 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    process_folder(root) 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				-    log.info("浙江省海关国别所有文件处理完成!") 
			 | 
		
	
		
			
				 | 
				 | 
			
			
				+    log.info("浙江省海关国别所有文件处理完成!") 
			 |