base_mysql.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. import pymysql
  2. from sqlalchemy import create_engine, text
  3. from urllib.parse import quote_plus
  4. from utils.log import log
  5. provinces = [
  6. "北京市", "天津市", "上海市", "重庆市",
  7. "河北省", "山西省", "辽宁省", "吉林省",
  8. "黑龙江省", "江苏省", "浙江省", "安徽省",
  9. "福建省", "江西省", "山东省", "河南省",
  10. "湖北省", "湖南省", "广东省", "海南省",
  11. "四川省", "贵州省", "云南省", "陕西省",
  12. "甘肃省", "青海省", "台湾省",
  13. "内蒙古自治区", "广西壮族自治区", "西藏自治区",
  14. "宁夏回族自治区", "新疆维吾尔自治区"
  15. ]
  16. # 数据库配置
  17. DB_CONFIG = {
  18. 'host': '10.130.75.149',
  19. 'port': 3307,
  20. 'user': 'yto_crm',
  21. 'password': '%3sFUlsolaRI',
  22. 'database': 'crm_uat',
  23. 'charset': 'utf8mb4'
  24. }
  25. def get_commodity_id(commodity_name):
  26. """根据商品名称查询数据库,获取商品 ID 和商品名称"""
  27. fix_commodity_name = commodity_name
  28. if commodity_name.endswith(")") or commodity_name.endswith(")"):
  29. fix_commodity_name = commodity_name.rsplit("(")[0] or commodity_name.rsplit("(")[0]
  30. fix_commodity_name = fix_commodity_name.replace('*', '').replace('#', '').replace('“', '').replace('”', '').replace('。', '')
  31. try:
  32. # 连接数据库
  33. connection = pymysql.connect(**DB_CONFIG)
  34. with connection.cursor() as cursor:
  35. # 执行查询
  36. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name like %s"
  37. cursor.execute(sql, (f"{fix_commodity_name}%",))
  38. result = cursor.fetchall()
  39. if result:
  40. if len(result) == 1:
  41. return result[0][0], result[0][1]
  42. else:
  43. log.info(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
  44. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
  45. cursor.execute(sql, (f"{fix_commodity_name}",))
  46. result = cursor.fetchone()
  47. if not result:
  48. # 用原商品名称再查一次
  49. commodity_name = commodity_name.replace("(", "(").replace(")", ")")
  50. log.info(f"原商品名称查询,commodity_name:{commodity_name}")
  51. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
  52. cursor.execute(sql, (f"{commodity_name}",))
  53. result = cursor.fetchone()
  54. if result:
  55. return result[0], result[1]
  56. else:
  57. return None, None
  58. else:
  59. return result[0], result[1]
  60. else:
  61. return None, None
  62. except Exception as e:
  63. log.info(f"查询数据库时发生异常: {str(e)}")
  64. return None, None
  65. finally:
  66. if connection:
  67. connection.close()
  68. def get_hs_all():
  69. try:
  70. # 连接数据库
  71. connection = pymysql.connect(**DB_CONFIG)
  72. with connection.cursor() as cursor:
  73. # 执行查询
  74. sql = "SELECT e.id,e.category_name FROM t_yujin_crossborder_hs_category e"
  75. cursor.execute(sql)
  76. all_records = cursor.fetchall()
  77. if all_records:
  78. return all_records
  79. else:
  80. return None
  81. except Exception as e:
  82. log.info(f"查询数据库时发生异常: {str(e)}")
  83. return None
  84. finally:
  85. if connection:
  86. connection.close()
  87. def get_code_exist(crossborder_year_month, prov_code):
  88. try:
  89. # 使用 with 自动管理连接生命周期
  90. with pymysql.connect(**DB_CONFIG) as connection:
  91. with connection.cursor() as cursor:
  92. # 执行查询
  93. sql = """
  94. SELECT COUNT(1)
  95. FROM t_yujin_crossborder_prov_commodity_trade e
  96. WHERE e.crossborder_year_month = %s
  97. AND e.prov_code = %s
  98. """
  99. cursor.execute(sql, (crossborder_year_month, prov_code))
  100. result = cursor.fetchone()
  101. return int(result[0]) if result and result[0] else 0
  102. except Exception as e:
  103. log.info(f"[数据库查询异常] 查询条件: {crossborder_year_month}, {prov_code} | 错误详情: {str(e)}")
  104. return 0
  105. # 对密码进行 URL 编码
  106. encoded_password = quote_plus(DB_CONFIG["password"])
  107. # 构建 SQLAlchemy 引擎
  108. engine = create_engine(
  109. f"mysql+pymysql://{DB_CONFIG['user']}:{encoded_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}",
  110. pool_size=5,
  111. max_overflow=10
  112. )
  113. def bulk_insert(sql_statements):
  114. """
  115. 批量执行 SQL 插入语句
  116. :param sql_statements: 包含多个 INSERT 语句的列表
  117. """
  118. if not sql_statements:
  119. log.info("未提供有效的 SQL 插入语句,跳过操作")
  120. return
  121. try:
  122. with engine.connect() as conn:
  123. with conn.begin():
  124. for sql in sql_statements:
  125. stmt = text(sql.strip())
  126. conn.execute(stmt)
  127. log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
  128. except Exception as e:
  129. log.info(f"数据库操作失败: {str(e)}")
  130. raise
  131. def update_january_yoy(prov_name):
  132. """
  133. 更新指定省份1月份同比数据
  134. :param prov_name: 省份名称,默认为福建省
  135. """
  136. update_sql = text("""
  137. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  138. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  139. ON curr.city_code = prev.city_code
  140. AND prev.crossborder_year_month = DATE_FORMAT(
  141. DATE_SUB(
  142. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  143. INTERVAL 1 YEAR
  144. ),
  145. '%Y-01'
  146. )
  147. SET
  148. curr.yoy_import_export = COALESCE (
  149. ROUND(
  150. (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
  151. ), 0.0000
  152. ), curr.yoy_import = COALESCE (
  153. ROUND(
  154. (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
  155. ), 0.0000
  156. ), curr.yoy_export = COALESCE (
  157. ROUND(
  158. (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
  159. ), 0.0000
  160. )
  161. WHERE
  162. curr.prov_name = :prov_name
  163. AND curr.crossborder_year_month LIKE '%-01'
  164. AND curr.crossborder_year_month
  165. > '2023-01'
  166. """)
  167. try:
  168. with engine.begin() as conn:
  169. result = conn.execute(update_sql, {'prov_name': prov_name})
  170. log.info(f"Updated {result.rowcount} rows for {prov_name}")
  171. return result.rowcount
  172. except Exception as e:
  173. log.info(f"Update failed: {str(e)}")
  174. raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
  175. def clear_old_shandong_yoy(prov_name):
  176. """
  177. 清理山东省2024年前数据的同比指标
  178. """
  179. clear_sql = text("""
  180. UPDATE t_yujin_crossborder_prov_region_trade
  181. SET yoy_import_export = 0.0000,
  182. yoy_export = 0.0000,
  183. yoy_import = 0.0000
  184. WHERE prov_name = :prov_name
  185. AND crossborder_year_month < '2024-01'
  186. AND (yoy_import_export != 0
  187. OR yoy_export != 0
  188. OR yoy_import != 0) -- 优化:仅更新非零记录
  189. """)
  190. try:
  191. with engine.begin() as conn:
  192. result = conn.execute(clear_sql, {'prov_name': prov_name})
  193. log.info(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
  194. return result.rowcount
  195. except Exception as e:
  196. log.info(f"旧数据清零失败: {str(e)}")
  197. raise
  198. def update_shandong_yoy(prov_name):
  199. """
  200. 完整更新山东省同比数据(包含新旧数据处理)
  201. """
  202. try:
  203. # 步骤1:清理旧数据
  204. cleared = clear_old_shandong_yoy(prov_name)
  205. # 步骤2:计算新数据
  206. updated = _update_shandong_new_yoy(prov_name)
  207. log.info(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
  208. return {'cleared': cleared, 'updated': updated}
  209. except Exception as e:
  210. log.info("{prov_name} 数据处理失败", exc_info=True)
  211. raise
  212. def _update_shandong_new_yoy(prov_name):
  213. """
  214. 处理2024年及之后的山东省数据(内部方法)
  215. """
  216. update_sql = text("""UPDATE t_yujin_crossborder_prov_region_trade AS curr
  217. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  218. ON curr.city_code = prev.city_code
  219. AND prev.crossborder_year_month = DATE_FORMAT(
  220. DATE_SUB(
  221. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  222. INTERVAL 1 YEAR
  223. ),
  224. '%Y-%m'
  225. )
  226. SET
  227. curr.yoy_import_export = COALESCE (
  228. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  229. 0.0000
  230. ),
  231. curr.yoy_import = COALESCE (
  232. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  233. 0.0000
  234. ),
  235. curr.yoy_export = COALESCE (
  236. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  237. 0.0000
  238. )
  239. WHERE
  240. curr.prov_name = :prov_name
  241. AND curr.crossborder_year_month >= '2024-01'
  242. AND prev.monthly_total IS NOT NULL
  243. """)
  244. with engine.begin() as conn:
  245. result = conn.execute(update_sql, {'prov_name': prov_name})
  246. log.info(f"{prov_name} 新数据更新数: {result.rowcount}")
  247. return result.rowcount
  248. def update_january_yoy_origin(region_name):
  249. """
  250. 更新指定省份1月份同比数据
  251. :param region_name: 省份名称,默认为福建省
  252. """
  253. update_sql = text("""
  254. UPDATE t_yujin_crossborder_region_trade AS curr
  255. INNER JOIN t_yujin_crossborder_region_trade AS prev
  256. ON curr.region_code = prev.region_code
  257. AND prev.year_month = DATE_FORMAT(
  258. DATE_SUB(
  259. STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'),
  260. INTERVAL 1 YEAR
  261. ),
  262. '%Y-01'
  263. )
  264. SET
  265. curr.ytd_total = COALESCE (
  266. ROUND(
  267. (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
  268. ), 0.0000
  269. ), curr.ytd_import = COALESCE (
  270. ROUND(
  271. (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
  272. ), 0.0000
  273. ), curr.ytd_export = COALESCE (
  274. ROUND(
  275. (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
  276. ), 0.0000
  277. )
  278. WHERE
  279. curr.region_name = :region_name
  280. AND curr.year_month LIKE '%-01'
  281. AND curr.year_month
  282. > '2023-01'
  283. """)
  284. try:
  285. with engine.begin() as conn:
  286. result = conn.execute(update_sql, {'region_name': region_name})
  287. log.info(f"Updated {result.rowcount} rows for {region_name}")
  288. return result.rowcount
  289. except Exception as e:
  290. log.info(f"Update failed: {str(e)}")
  291. raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
  292. def clear_old_shandong_yoy_origin(region_name):
  293. """
  294. 清理山东省2024年前数据的同比指标
  295. """
  296. clear_sql = text("""
  297. UPDATE t_yujin_crossborder_region_trade
  298. SET ytd_total = 0.0000,
  299. ytd_export = 0.0000,
  300. ytd_import = 0.0000
  301. WHERE region_name = :region_name
  302. AND `year_month` < '2024-01'
  303. AND (ytd_total != 0
  304. OR ytd_export != 0
  305. OR ytd_import != 0) -- 优化:仅更新非零记录
  306. """)
  307. try:
  308. with engine.begin() as conn:
  309. result = conn.execute(clear_sql, {'region_name': region_name})
  310. log.info(f"{region_name} 旧数据清零记录数: {result.rowcount}")
  311. return result.rowcount
  312. except Exception as e:
  313. log.info(f"旧数据清零失败: {str(e)}")
  314. raise
  315. def update_shandong_yoy_origin(region_name):
  316. """
  317. 完整更新山东省同比数据(包含新旧数据处理)
  318. """
  319. try:
  320. # 步骤1:清理旧数据
  321. cleared = clear_old_shandong_yoy_origin(region_name)
  322. # 步骤2:计算新数据
  323. updated = _update_shandong_new_yoy_origin(region_name)
  324. log.info(f"{region_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
  325. return {'cleared': cleared, 'updated': updated}
  326. except Exception as e:
  327. log.info("{region_name} 数据处理失败", exc_info=True)
  328. raise
  329. def _update_shandong_new_yoy_origin(region_name):
  330. """
  331. 处理2024年及之后的山东省数据(内部方法)
  332. """
  333. update_sql = text("""UPDATE t_yujin_crossborder_region_trade AS curr
  334. INNER JOIN t_yujin_crossborder_region_trade AS prev
  335. ON curr.region_code = prev.region_code
  336. AND prev.year_month = DATE_FORMAT(
  337. DATE_SUB(
  338. STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'),
  339. INTERVAL 1 YEAR
  340. ),
  341. '%Y-%m'
  342. )
  343. SET
  344. curr.ytd_total = COALESCE (
  345. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  346. 0.0000
  347. ),
  348. curr.ytd_import = COALESCE (
  349. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  350. 0.0000
  351. ),
  352. curr.ytd_export = COALESCE (
  353. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  354. 0.0000
  355. )
  356. WHERE
  357. curr.region_name = :region_name
  358. AND curr.year_month >= '2024-01'
  359. AND prev.monthly_total IS NOT NULL
  360. """)
  361. with engine.begin() as conn:
  362. result = conn.execute(update_sql, {'region_name': region_name})
  363. log.info(f"{region_name} 新数据更新数: {result.rowcount}")
  364. return result.rowcount
  365. if __name__ == '__main__':
  366. # check_year, check_month = 2024, 4
  367. # count = get_code_exist(f'{check_year}-{check_month:02d}', "340000")
  368. # print(count)
  369. # 新表更新地级市同比
  370. for province in provinces:
  371. update_january_yoy(province)
  372. update_shandong_yoy(province)
  373. # 旧表更新省份同比
  374. # for province in provinces:
  375. # update_january_yoy_origin(province)
  376. # update_shandong_yoy_origin(province)
  377. log.info("同比sql处理完成")