guangdong_sub_customs_parse_excel.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. from decimal import Decimal
  2. from pathlib import Path
  3. import pandas as pd
  4. from openpyxl import load_workbook
  5. from db_helper import DBHelper
  6. from utils.constants import DOWNLOAD_DIR, GUANGDONG_CITY
  7. from utils.log import log
  8. from utils.parse_utils import traverse_and_process, extract_year_month_from_path, get_previous_month_dir
  9. # 配置日志
  10. PROV_CODE = "440000"
  11. PROV_NAME = "广东省"
  12. download_dir = DOWNLOAD_DIR / "guangdong"
  13. db = DBHelper()
  14. def match_customs_file(filename, customs_name, year, month):
  15. """匹配海关文件"""
  16. filename_lower = filename.lower()
  17. if customs_name == "广州海关":
  18. return "7地市进出口综合统计" in filename_lower
  19. elif customs_name == "深圳海关":
  20. return "深圳海关综合统计资料" in filename_lower
  21. elif customs_name == "汕头海关":
  22. return "5市报表" in filename_lower
  23. elif customs_name == "黄埔海关":
  24. return "东莞市进出口企业性质总值表" in filename_lower
  25. elif customs_name == "江门海关":
  26. if "江门市" in filename_lower or "阳江市" in filename_lower:
  27. return "外贸进出口有关情况统计表" in filename_lower
  28. elif customs_name == "湛江海关":
  29. if "湛江市" in filename_lower or "茂名市" in filename_lower:
  30. return "外贸进出口数据" in filename_lower
  31. return False
  32. def process_guangzhou_customs(file_path, year, month):
  33. """处理广州海关数据"""
  34. try:
  35. # 读取Excel文件
  36. wb = load_workbook(file_path, data_only=True)
  37. sheet = wb.worksheets[0]
  38. # 查找包含月份的表头行
  39. month_str = f"{year}年{month}月"
  40. header_row = None
  41. for i in range(1, 4): # 检查前3行
  42. row_values = [str(cell.value).strip() if cell.value else "" for cell in sheet[i]]
  43. if any(month_str in val for val in row_values):
  44. header_row = i
  45. break
  46. if header_row is None:
  47. log.error(f"未找到 {month_str} 的表头")
  48. return pd.DataFrame()
  49. # 确定数据列位置
  50. data_cols = []
  51. for cell in sheet[header_row]:
  52. if cell.value and month_str in str(cell.value):
  53. data_cols.append(cell.column - 1) # 转换为0-based索引
  54. if len(data_cols) < 6:
  55. log.error(f"未找到足够的 {month_str} 数据列")
  56. return pd.DataFrame()
  57. # 提取7地市数据
  58. results = []
  59. target_cities = ["广州市", "深圳市", "东莞市", "汕头市", "江门市", "湛江市", "茂名市"]
  60. for row in sheet.iter_rows(min_row=header_row + 1):
  61. city_cell = row[0].value
  62. if city_cell and "广东省" in str(city_cell):
  63. city_name = str(city_cell).replace("广东省", "").strip()
  64. if city_name in target_cities:
  65. try:
  66. # 获取各列值
  67. total = row[data_cols[0]].value
  68. export = row[data_cols[1]].value
  69. import_val = row[data_cols[2]].value
  70. yoy_total = row[data_cols[3]].value
  71. yoy_export = row[data_cols[4]].value
  72. yoy_import = row[data_cols[5]].value
  73. # 转换数据类型
  74. def convert_value(val):
  75. if isinstance(val, (int, float)):
  76. return Decimal(str(val))
  77. elif isinstance(val, str) and val.replace(".", "").isdigit():
  78. return Decimal(val)
  79. return Decimal(0)
  80. # 添加到结果
  81. results.append({
  82. "city_name": city_name,
  83. "monthly_total": convert_value(total),
  84. "monthly_import": convert_value(import_val),
  85. "monthly_export": convert_value(export),
  86. "yoy_import_export": convert_value(yoy_total),
  87. "yoy_import": convert_value(yoy_import),
  88. "yoy_export": convert_value(yoy_export)
  89. })
  90. except Exception as e:
  91. log.error(f"处理城市 {city_name} 出错: {e}")
  92. return pd.DataFrame(results)
  93. except Exception as e:
  94. log.error(f"处理广州海关文件出错: {str(e)}")
  95. return pd.DataFrame()
  96. def process_shenzhen_customs(file_path, year, month):
  97. """处理深圳海关数据"""
  98. try:
  99. wb = load_workbook(file_path, data_only=True)
  100. results = []
  101. # 处理深圳和惠州两个sheet
  102. for city, sheet_name in [("深圳市", "深圳市进出口(贸易方式)"),
  103. ("惠州市", "惠州市进出口(贸易方式)")]:
  104. try:
  105. if sheet_name in wb.sheetnames:
  106. sheet = wb[sheet_name]
  107. else:
  108. log.warning(f"未找到sheet: {sheet_name}")
  109. continue
  110. # 查找总值行
  111. total_row_idx = None
  112. for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
  113. if row and "总值" in str(row[0]):
  114. total_row_idx = i
  115. break
  116. if total_row_idx is None:
  117. log.error(f"未找到总值行: {sheet_name}")
  118. continue
  119. # 查找包含月份的表头
  120. month_str = f"{year}年{month}月"
  121. header_row = None
  122. data_col = None
  123. for i, row in enumerate(sheet.iter_rows(max_row=3, values_only=True), 1):
  124. if any(month_str in str(cell) for cell in row if cell):
  125. header_row = i
  126. for col_idx, cell_val in enumerate(row):
  127. if cell_val and month_str in str(cell_val):
  128. data_col = col_idx
  129. break
  130. break
  131. if data_col is None:
  132. log.error(f"未找到 {month_str} 列")
  133. continue
  134. # 获取数据值 (亿元转换为万元)
  135. total_value = sheet.cell(row=total_row_idx, column=data_col + 1).value
  136. yoy_value = sheet.cell(row=total_row_idx, column=data_col + 2).value
  137. if total_value is None or yoy_value is None:
  138. log.error(f"{city} 数据为空")
  139. continue
  140. # 转换数据类型
  141. def convert_value(val):
  142. if isinstance(val, (int, float)):
  143. return Decimal(str(val))
  144. elif isinstance(val, str) and val.replace(".", "").isdigit():
  145. return Decimal(val)
  146. return Decimal(0)
  147. # 添加到结果
  148. results.append({
  149. "city_name": city,
  150. "monthly_total": convert_value(total_value) * Decimal('10000'),
  151. "monthly_import": None, # 没有单独的进口/出口数据
  152. "monthly_export": None,
  153. "yoy_import_export": convert_value(yoy_value),
  154. "yoy_import": Decimal(0),
  155. "yoy_export": Decimal(0)
  156. })
  157. except Exception as e:
  158. log.error(f"处理 {city} 数据出错: {str(e)}")
  159. return pd.DataFrame(results)
  160. except Exception as e:
  161. log.error(f"处理深圳海关文件出错: {str(e)}")
  162. return pd.DataFrame()
  163. def process_shantou_customs(file_path, year, month):
  164. """处理汕头海关数据 (逻辑同广州海关)"""
  165. log.info(f"处理汕头海关文件: {file_path.name}")
  166. return process_guangzhou_customs(file_path, year, month)
  167. def process_huangpu_customs(file_path, year, month):
  168. """处理黄埔海关数据"""
  169. try:
  170. wb = load_workbook(file_path, data_only=True)
  171. sheet = wb.active
  172. # 查找合计行
  173. total_row_idx = None
  174. for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
  175. if row and "合计" in str(row[0]):
  176. total_row_idx = i
  177. break
  178. if total_row_idx is None:
  179. log.error("未找到合计行")
  180. return pd.DataFrame()
  181. # 查找包含月份的表头
  182. month_str = f"{year}年{month}月"
  183. header_row = None
  184. data_cols = []
  185. for i in range(1, 4): # 检查前3行
  186. row_values = [str(cell.value) if cell.value else "" for cell in sheet[i]]
  187. if any(month_str in val and "人民币" in val for val in row_values):
  188. header_row = i
  189. for col_idx, val in enumerate(row_values):
  190. if val and month_str in val and "人民币" in val:
  191. data_cols.append(col_idx)
  192. break
  193. if len(data_cols) < 6:
  194. log.error(f"未找到足够的 {month_str} 人民币数据列")
  195. return pd.DataFrame()
  196. # 获取合计行数据
  197. row_values = [cell.value for cell in sheet[total_row_idx]]
  198. # 转换数据类型
  199. def convert_value(val):
  200. if isinstance(val, (int, float)):
  201. return Decimal(str(val))
  202. elif isinstance(val, str) and val.replace(".", "").isdigit():
  203. return Decimal(val)
  204. return Decimal(0)
  205. # 提取数据
  206. results = [{
  207. "city_name": "东莞市",
  208. "monthly_total": convert_value(row_values[data_cols[0]]), # 进出口
  209. "monthly_export": convert_value(row_values[data_cols[1]]), # 出口
  210. "monthly_import": convert_value(row_values[data_cols[2]]), # 进口
  211. "yoy_import_export": convert_value(row_values[data_cols[3]]), # 进出口同比
  212. "yoy_export": convert_value(row_values[data_cols[4]]), # 出口同比
  213. "yoy_import": convert_value(row_values[data_cols[5]]) # 进口同比
  214. }]
  215. return pd.DataFrame(results)
  216. except Exception as e:
  217. log.error(f"处理黄埔海关文件出错: {str(e)}")
  218. return pd.DataFrame()
  219. def process_jiangmen_customs(file_path, year, month):
  220. """处理江门海关数据"""
  221. try:
  222. wb = load_workbook(file_path, data_only=True)
  223. sheet = wb.active
  224. # 从文件名确定城市
  225. city_name = "江门市" if "江门市" in file_path.name else "阳江市"
  226. target_row_name = "江门市进出口商品" if city_name == "江门市" else "阳江市进出口商品总值"
  227. # 查找目标行
  228. target_row_idx = None
  229. for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
  230. if row and target_row_name in str(row[0]):
  231. target_row_idx = i
  232. break
  233. if target_row_idx is None:
  234. log.error(f"未找到 {target_row_name} 行")
  235. return pd.DataFrame()
  236. # 查找包含月份的表头
  237. month_str = f"{year}年{month}月"
  238. header_row = None
  239. data_cols = []
  240. for i in range(1, 4): # 检查前3行
  241. row_values = [str(cell.value) if cell.value else "" for cell in sheet[i]]
  242. if any(month_str in val for val in row_values):
  243. header_row = i
  244. for col_idx, val in enumerate(row_values):
  245. if val and month_str in val:
  246. data_cols.append(col_idx)
  247. break
  248. if len(data_cols) < 6:
  249. log.error(f"未找到足够的 {month_str} 数据列")
  250. return pd.DataFrame()
  251. # 获取目标行数据
  252. row_values = [cell.value for cell in sheet[target_row_idx]]
  253. # 转换数据类型
  254. def convert_value(val):
  255. if isinstance(val, (int, float)):
  256. return Decimal(str(val))
  257. elif isinstance(val, str) and val.replace(".", "").isdigit():
  258. return Decimal(val)
  259. return Decimal(0)
  260. # 提取数据 (亿元转换为万元)
  261. return pd.DataFrame([{
  262. "city_name": city_name,
  263. "monthly_total": convert_value(row_values[data_cols[0]]) * Decimal('10000'), # 进出口
  264. "monthly_export": convert_value(row_values[data_cols[1]]) * Decimal('10000'), # 出口
  265. "monthly_import": convert_value(row_values[data_cols[2]]) * Decimal('10000'), # 进口
  266. "yoy_import_export": convert_value(row_values[data_cols[3]]), # 进出口同比
  267. "yoy_export": convert_value(row_values[data_cols[4]]), # 出口同比
  268. "yoy_import": convert_value(row_values[data_cols[5]]) # 进口同比
  269. }])
  270. except Exception as e:
  271. log.error(f"处理江门海关文件出错: {str(e)}")
  272. return pd.DataFrame()
  273. def process_zhanjiang_customs(file_path, year, month):
  274. """处理湛江海关数据"""
  275. try:
  276. wb = load_workbook(file_path, data_only=True)
  277. sheet = wb.worksheets[0]
  278. # 从文件名确定城市
  279. city_name = "湛江市" if "湛江市" in file_path.name else "茂名市"
  280. # 查找月度数据表格
  281. table_start_row = None
  282. month_str = f"{year}年{month}月"
  283. for i, row in enumerate(sheet.iter_rows(values_only=True), 1):
  284. if row and any(month_str in str(cell) for cell in row if cell):
  285. table_start_row = i
  286. break
  287. if table_start_row is None:
  288. log.error(f"未找到 {month_str} 月度数据表")
  289. return pd.DataFrame()
  290. # 查找目标行(城市名所在行)
  291. target_row_idx = None
  292. for i in range(table_start_row, table_start_row + 20): # 在后续行中查找
  293. row_val = sheet.cell(row=i, column=1).value
  294. if row_val and city_name in str(row_val):
  295. target_row_idx = i
  296. break
  297. if target_row_idx is None:
  298. log.error(f"未找到 {city_name} 数据行")
  299. return pd.DataFrame()
  300. # 提取数据
  301. results = []
  302. for col in [2, 3, 4, 5, 6, 7]: # 依次为进出口、出口、进口、进出口同比、出口同比、进口同比
  303. cell_value = sheet.cell(row=target_row_idx, column=col).value
  304. results.append(cell_value)
  305. # 转换数据类型
  306. def convert_value(val):
  307. if isinstance(val, (int, float)):
  308. return Decimal(str(val))
  309. elif isinstance(val, str) and val.replace(".", "").isdigit():
  310. return Decimal(val)
  311. return Decimal(0)
  312. return pd.DataFrame([{
  313. "city_name": city_name,
  314. "monthly_total": convert_value(results[0]),
  315. "monthly_export": convert_value(results[1]),
  316. "monthly_import": convert_value(results[2]),
  317. "yoy_import_export": convert_value(results[3]),
  318. "yoy_export": convert_value(results[4]),
  319. "yoy_import": convert_value(results[5])
  320. }])
  321. except Exception as e:
  322. log.error(f"处理湛江海关文件出错: {str(e)}")
  323. return pd.DataFrame()
  324. def get_customs_processor(customs_name):
  325. """获取不同海关的处理函数"""
  326. processors = {
  327. "广州海关": process_guangzhou_customs,
  328. "深圳海关": process_shenzhen_customs,
  329. "汕头海关": process_shantou_customs,
  330. "黄埔海关": process_huangpu_customs,
  331. "江门海关": process_jiangmen_customs,
  332. "湛江海关": process_zhanjiang_customs
  333. }
  334. return processors.get(customs_name)
  335. def parse_excel(current_dir):
  336. """主解析入口(优化为单参数模式)
  337. Args:
  338. current_dir (str): 当前月份数据目录(格式:/年份/省份/月份)
  339. """
  340. current_path = Path(current_dir)
  341. year, month = extract_year_month_from_path(current_path)
  342. log.info(f"开始处理 {year}年{month}月 数据: {current_dir}")
  343. try:
  344. # 动态获取前月目录
  345. prev_dir = get_previous_month_dir(current_path) if month != 1 else None
  346. if prev_dir:
  347. log.info(f"上个月数据目录: {prev_dir}")
  348. # 获取当前目录下所有Excel文件
  349. excel_files = list(current_path.glob("*.xls*"))
  350. if not excel_files:
  351. log.warning(f"当前目录下未找到Excel文件: {current_dir}")
  352. return
  353. # 按海关处理每个文件
  354. all_results = pd.DataFrame()
  355. customs_map = {
  356. "广州海关": [],
  357. "深圳海关": [],
  358. "汕头海关": [],
  359. "黄埔海关": [],
  360. "江门海关": [],
  361. "湛江海关": []
  362. }
  363. # 组织文件到不同的海关
  364. for file_path in excel_files:
  365. for customs_name in customs_map.keys():
  366. if match_customs_file(file_path.name, customs_name, year, month):
  367. customs_map[customs_name].append(file_path)
  368. log.info(f"匹配到 {customs_name} 文件: {file_path.name}")
  369. break
  370. # 处理每个海关的文件
  371. for customs_name, file_list in customs_map.items():
  372. processor = get_customs_processor(customs_name)
  373. if not processor:
  374. continue
  375. for file_path in file_list:
  376. # 特殊处理深圳海关和江门海关的2月数据(缺少1月数据)
  377. if customs_name in ["深圳海关", "江门海关"] and month == 2:
  378. # 获取2月份完整数据
  379. df_full = processor(file_path, year, month)
  380. if df_full.empty:
  381. continue
  382. # 创建1月份数据 (取2月份数据的一半)
  383. df_half = df_full.copy()
  384. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  385. df_half[col] = df_half[col] / 2
  386. # 设置1月份数据
  387. df_half['month'] = 1
  388. # 设置2月份数据 (取2月份数据的一半)
  389. df_full['month'] = 2
  390. for col in ['monthly_total', 'monthly_import', 'monthly_export']:
  391. df_full[col] = df_full[col] / 2
  392. # 合并数据
  393. df_customs = pd.concat([df_half, df_full])
  394. else:
  395. # 正常处理数据
  396. df_customs = processor(file_path, year, month)
  397. if not df_customs.empty:
  398. df_customs['month'] = month
  399. if not df_customs.empty:
  400. all_results = pd.concat([all_results, df_customs])
  401. # 如果没有获取到数据
  402. if all_results.empty:
  403. log.warning(f"未处理到有效数据: {current_dir}")
  404. return
  405. # 添加公共字段
  406. all_results['prov_code'] = PROV_CODE
  407. all_results['prov_name'] = PROV_NAME
  408. all_results['year'] = year
  409. all_results['month'] = all_results.get('month', month)
  410. all_results['crossborder_year_month'] = all_results['year'].astype(str) + '-' + all_results['month'].astype(
  411. str).str.zfill(2)
  412. # 添加城市编码
  413. def get_city_code(row):
  414. return GUANGDONG_CITY.get(row['city_name'], '0000')
  415. all_results['city_code'] = all_results.apply(get_city_code, axis=1)
  416. # 排序并删除重复项
  417. all_results = all_results.sort_values(by=['city_code', 'crossborder_year_month'])
  418. all_results = all_results.drop_duplicates(subset=['crossborder_year_month', 'city_code'], keep='last')
  419. # 打印处理结果
  420. log.info(f"处理完成,共获得 {len(all_results)} 条数据")
  421. # 选择入库字段
  422. final_df = all_results[[
  423. 'crossborder_year_month', 'prov_code', 'prov_name',
  424. 'city_code', 'city_name', 'monthly_total',
  425. 'monthly_import', 'monthly_export', 'yoy_import_export',
  426. 'yoy_import', 'yoy_export'
  427. ]].copy()
  428. # 打印前几条数据
  429. log.info(f"处理后数据示例:\n{final_df.head()}")
  430. # 这里调用DBHelper入库(实际使用时请取消注释)
  431. """
  432. from db_helper import DBHelper
  433. db = DBHelper()
  434. db.bulk_insert(
  435. final_df,
  436. 't_yujin_crossborder_prov_region_trade',
  437. conflict_columns=['crossborder_year_month', 'city_code'],
  438. update_columns=['monthly_total', 'monthly_import', 'monthly_export',
  439. 'yoy_import_export', 'yoy_import', 'yoy_export']
  440. )
  441. """
  442. log.info(f"{current_dir}数据已全部成功处理")
  443. except Exception as e:
  444. log.error(f"处理失败:{current_dir},错误:{str(e)}")
  445. raise
  446. # 遍历目录的函数(原样保留)
  447. # 测试入口
  448. if __name__ == "__main__":
  449. traverse_and_process(download_dir, parse_excel, province_name="guangdong")