base_mysql.py 14 KB


  1. from urllib.parse import quote_plus
  2. import pymysql
  3. from sqlalchemy import text, create_engine
  4. from crossborder.utils.crypto_utils import AESCryptor
  5. from crossborder.utils.log import get_logger
  6. log = get_logger(__name__)
  7. provinces = [
  8. "北京市", "天津市", "上海市", "重庆市",
  9. "河北省", "山西省", "辽宁省", "吉林省",
  10. "黑龙江省", "江苏省", "浙江省", "安徽省",
  11. "福建省", "江西省", "山东省", "河南省",
  12. "湖北省", "湖南省", "广东省", "海南省",
  13. "四川省", "贵州省", "云南省", "陕西省",
  14. "甘肃省", "青海省", "台湾省",
  15. "内蒙古自治区", "广西壮族自治区", "西藏自治区",
  16. "宁夏回族自治区", "新疆维吾尔自治区"
  17. ]
  18. cryptor = AESCryptor("uat_ff419620e7047a3c372e2513c5a2b9a5")
  19. # 数据库配置
  20. DB_CONFIG = {
  21. 'host': '10.130.75.139',
  22. 'port': 3307,
  23. 'user': 'crm_uat',
  24. 'password': '&8%biuKNqDYZdXe3',
  25. 'database': 'crm_uat',
  26. 'charset': 'utf8mb4'
  27. }
  28. # 修改解密函数
  29. def get_decrypted_password():
  30. encrypted_pass = DB_CONFIG['password']
  31. if encrypted_pass.startswith("ENC("):
  32. try:
  33. return cryptor.decrypt(encrypted_pass)
  34. except Exception as e:
  35. log.error(f"密码解密失败: {str(e)}")
  36. raise
  37. return encrypted_pass
  38. # 在 base_mysql.py 模块加载时自动完成解密
  39. def initialize_engine():
  40. """初始化数据库引擎(包含密码解密)"""
  41. db_config = DB_CONFIG.copy()
  42. # db_config['password'] = get_decrypted_password()
  43. #
  44. # # 对密码进行 URL 编码
  45. # encoded_password = quote_plus(db_config["password"])
  46. # 构建 SQLAlchemy 引擎
  47. return create_engine(
  48. f"mysql+pymysql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset={db_config['charset']}",
  49. pool_size=5,
  50. max_overflow=10
  51. )
  52. # 全局引擎实例
  53. engine = initialize_engine()
  54. def get_commodity_id(commodity_name):
  55. """根据商品名称查询数据库,获取商品 ID 和商品名称"""
  56. fix_commodity_name = commodity_name
  57. if commodity_name.endswith(")") or commodity_name.endswith(")"):
  58. fix_commodity_name = commodity_name.rsplit("(")[0] or commodity_name.rsplit("(")[0]
  59. fix_commodity_name = fix_commodity_name.replace('*', '').replace('#', '').replace('“', '').replace('”', '').replace('。', '')
  60. connection = None
  61. try:
  62. # 连接数据库
  63. db_config = DB_CONFIG.copy()
  64. db_config['password'] = get_decrypted_password()
  65. connection = pymysql.connect(**db_config)
  66. with connection.cursor() as cursor:
  67. # 执行查询
  68. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name like %s"
  69. cursor.execute(sql, (f"{fix_commodity_name}%",))
  70. result = cursor.fetchall()
  71. if result:
  72. if len(result) == 1:
  73. return result[0][0], result[0][1]
  74. else:
  75. log.info(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
  76. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
  77. cursor.execute(sql, (f"{fix_commodity_name}",))
  78. result = cursor.fetchone()
  79. if not result:
  80. # 用原商品名称再查一次
  81. commodity_name = commodity_name.replace("(", "(").replace(")", ")")
  82. log.info(f"原商品名称查询,commodity_name:{commodity_name}")
  83. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
  84. cursor.execute(sql, (f"{commodity_name}",))
  85. result = cursor.fetchone()
  86. if result:
  87. return result[0], result[1]
  88. else:
  89. return None, None
  90. else:
  91. return result[0], result[1]
  92. else:
  93. return None, None
  94. except Exception as e:
  95. log.info(f"查询数据库时发生异常: {str(e)}")
  96. return None, None
  97. finally:
  98. if connection:
  99. connection.close()
  100. def get_hs_all():
  101. try:
  102. # 连接数据库
  103. db_config = DB_CONFIG.copy()
  104. db_config['password'] = get_decrypted_password()
  105. connection = pymysql.connect(**db_config)
  106. with connection.cursor() as cursor:
  107. # 执行查询
  108. sql = "SELECT e.id,e.category_name FROM t_yujin_crossborder_hs_category e"
  109. cursor.execute(sql)
  110. all_records = cursor.fetchall()
  111. if all_records:
  112. return all_records
  113. else:
  114. return None
  115. except Exception as e:
  116. log.info(f"查询数据库时发生异常: {str(e)}")
  117. return None
  118. finally:
  119. if connection:
  120. connection.close()
  121. def get_code_exist(crossborder_year_month, prov_code):
  122. try:
  123. # 使用 with 自动管理连接生命周期
  124. db_config = DB_CONFIG.copy()
  125. db_config['password'] = get_decrypted_password()
  126. with pymysql.connect(**db_config) as connection:
  127. with connection.cursor() as cursor:
  128. # 执行查询
  129. sql = """
  130. SELECT COUNT(1)
  131. FROM t_yujin_crossborder_prov_commodity_trade e
  132. WHERE e.crossborder_year_month = %s
  133. AND e.prov_code = %s
  134. """
  135. cursor.execute(sql, (crossborder_year_month, prov_code))
  136. result = cursor.fetchone()
  137. return int(result[0]) if result and result[0] else 0
  138. except Exception as e:
  139. log.info(f"[数据库查询异常] 查询条件: {crossborder_year_month}, {prov_code} | 错误详情: {str(e)}")
  140. return 0
  141. def bulk_insert(sql_statements):
  142. """
  143. 批量执行 SQL 插入语句
  144. :param sql_statements: 包含多个 INSERT 语句的列表
  145. """
  146. if not sql_statements:
  147. log.info("未提供有效的 SQL 插入语句,跳过操作")
  148. return
  149. try:
  150. with engine.connect() as conn:
  151. with conn.begin():
  152. for sql in sql_statements:
  153. stmt = text(sql.strip())
  154. conn.execute(stmt)
  155. log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
  156. except Exception as e:
  157. log.info(f"数据库操作失败: {str(e)}")
  158. raise
  159. def clear_old_shandong_yoy(prov_name):
  160. """
  161. 清理山东省2024年前数据的同比指标
  162. """
  163. clear_sql = text("""
  164. UPDATE t_yujin_crossborder_prov_region_trade
  165. SET yoy_import_export = 0.0000,
  166. yoy_export = 0.0000,
  167. yoy_import = 0.0000
  168. WHERE prov_name = :prov_name
  169. AND crossborder_year_month < '2024-01'
  170. AND (yoy_import_export != 0
  171. OR yoy_export != 0
  172. OR yoy_import != 0) -- 优化:仅更新非零记录
  173. """)
  174. try:
  175. with engine.begin() as conn:
  176. result = conn.execute(clear_sql, {'prov_name': prov_name})
  177. log.info(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
  178. return result.rowcount
  179. except Exception as e:
  180. log.info(f"旧数据清零失败: {str(e)}")
  181. raise
  182. def update_shandong_yoy(prov_name):
  183. """
  184. 完整更新山东省同比数据(包含新旧数据处理)
  185. """
  186. try:
  187. # 步骤1:清理旧数据
  188. cleared = clear_old_shandong_yoy(prov_name)
  189. # 步骤2:计算新数据
  190. updated = _update_shandong_new_yoy(prov_name)
  191. log.info(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
  192. return {'cleared': cleared, 'updated': updated}
  193. except Exception as e:
  194. log.info("{prov_name} 数据处理失败", exc_info=True)
  195. raise
  196. def _update_shandong_new_yoy(prov_name):
  197. """
  198. 处理2024年及之后的山东省数据(内部方法)
  199. """
  200. update_sql = text("""UPDATE t_yujin_crossborder_prov_region_trade AS curr
  201. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  202. ON curr.city_code = prev.city_code
  203. AND prev.crossborder_year_month = DATE_FORMAT(
  204. DATE_SUB(
  205. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  206. INTERVAL 1 YEAR
  207. ),
  208. '%Y-%m'
  209. )
  210. SET
  211. curr.yoy_import_export = COALESCE (
  212. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  213. 0.0000
  214. ),
  215. curr.yoy_import = COALESCE (
  216. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  217. 0.0000
  218. ),
  219. curr.yoy_export = COALESCE (
  220. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  221. 0.0000
  222. )
  223. WHERE
  224. curr.prov_name = :prov_name
  225. AND curr.crossborder_year_month >= '2024-01'
  226. AND prev.monthly_total IS NOT NULL
  227. """)
  228. with engine.begin() as conn:
  229. result = conn.execute(update_sql, {'prov_name': prov_name})
  230. log.info(f"{prov_name} 新数据更新数: {result.rowcount}")
  231. return result.rowcount
  232. def clear_old_shandong_yoy_origin(region_name):
  233. """
  234. 清理山东省2024年前数据的同比指标
  235. """
  236. clear_sql = text("""
  237. UPDATE t_yujin_crossborder_region_trade
  238. SET ytd_total = 0.0000,
  239. ytd_export = 0.0000,
  240. ytd_import = 0.0000
  241. WHERE region_name = :region_name
  242. AND `year_month` < '2024-01'
  243. AND (ytd_total != 0
  244. OR ytd_export != 0
  245. OR ytd_import != 0) -- 优化:仅更新非零记录
  246. """)
  247. try:
  248. with engine.begin() as conn:
  249. result = conn.execute(clear_sql, {'region_name': region_name})
  250. log.info(f"{region_name} 旧数据清零记录数: {result.rowcount}")
  251. return result.rowcount
  252. except Exception as e:
  253. log.info(f"旧数据清零失败: {str(e)}")
  254. raise
  255. def update_shandong_yoy_origin(region_name):
  256. """
  257. 完整更新山东省同比数据(包含新旧数据处理)
  258. """
  259. try:
  260. # 步骤1:清理旧数据
  261. cleared = clear_old_shandong_yoy_origin(region_name)
  262. # 步骤2:计算新数据
  263. updated = _update_shandong_new_yoy_origin(region_name)
  264. log.info(f"{region_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
  265. return {'cleared': cleared, 'updated': updated}
  266. except Exception as e:
  267. log.info("{region_name} 数据处理失败", exc_info=True)
  268. raise
  269. def _update_shandong_new_yoy_origin(region_name):
  270. """
  271. 处理2024年及之后的山东省数据(内部方法)
  272. """
  273. update_sql = text("""UPDATE t_yujin_crossborder_region_trade AS curr
  274. INNER JOIN t_yujin_crossborder_region_trade AS prev
  275. ON curr.region_code = prev.region_code
  276. AND prev.year_month = DATE_FORMAT(
  277. DATE_SUB(
  278. STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'),
  279. INTERVAL 1 YEAR
  280. ),
  281. '%Y-%m'
  282. )
  283. SET
  284. curr.ytd_total = COALESCE (
  285. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  286. 0.0000
  287. ),
  288. curr.ytd_import = COALESCE (
  289. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  290. 0.0000
  291. ),
  292. curr.ytd_export = COALESCE (
  293. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  294. 0.0000
  295. )
  296. WHERE
  297. curr.region_name = :region_name
  298. AND curr.year_month >= '2024-01'
  299. AND prev.monthly_total IS NOT NULL
  300. """)
  301. with engine.begin() as conn:
  302. result = conn.execute(update_sql, {'region_name': region_name})
  303. log.info(f"{region_name} 新数据更新数: {result.rowcount}")
  304. return result.rowcount
  305. if __name__ == '__main__':
  306. # commodity_code, commodity_name_fix = get_commodity_id('农产品')
  307. # print(commodity_code, commodity_name_fix)
  308. # check_year, check_month = 2024, 4
  309. # count = get_code_exist(f'{check_year}-{check_month:02d}', "340000")
  310. # print(count)
  311. # 新表更新地级市同比
  312. # for province in provinces:
  313. # update_shandong_yoy(province)
  314. update_shandong_yoy('浙江省')
  315. # 旧表更新省份同比
  316. # for province in provinces:
  317. # update_shandong_yoy_origin(province)
  318. # log.info("同比sql处理完成")