base_mysql.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. import pymysql
  2. from sqlalchemy import create_engine, text
  3. from urllib.parse import quote_plus
  4. # 数据库配置
  5. DB_CONFIG = {
  6. 'host': '10.130.75.149',
  7. 'port': 3307,
  8. 'user': 'yto_crm',
  9. 'password': '%3sFUlsolaRI',
  10. 'database': 'crm_uat',
  11. 'charset': 'utf8mb4'
  12. }
  13. def get_commodity_id(commodity_name):
  14. """根据商品名称查询数据库,获取商品 ID 和商品名称"""
  15. fix_commodity_name = commodity_name
  16. if commodity_name.endswith(")") or commodity_name.endswith(")"):
  17. fix_commodity_name = commodity_name.rsplit("(")[0] or commodity_name.rsplit("(")[0]
  18. fix_commodity_name = fix_commodity_name.replace('*', '').replace('#', '').replace('“', '').replace('”', '').replace('。', '')
  19. try:
  20. # 连接数据库
  21. connection = pymysql.connect(**DB_CONFIG)
  22. with connection.cursor() as cursor:
  23. # 执行查询
  24. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name like %s"
  25. cursor.execute(sql, (f"{fix_commodity_name}%",))
  26. result = cursor.fetchall()
  27. if result:
  28. if len(result) == 1:
  29. return result[0][0], result[0][1]
  30. else:
  31. print(f"查询结果为多条,商品id为:{result},fix_commodity_name:{fix_commodity_name},commodity_name: {commodity_name}")
  32. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
  33. cursor.execute(sql, (f"{fix_commodity_name}",))
  34. result = cursor.fetchone()
  35. if not result:
  36. # 用原商品名称再查一次
  37. commodity_name = commodity_name.replace("(", "(").replace(")", ")")
  38. print(f"原商品名称查询,commodity_name:{commodity_name}")
  39. sql = "SELECT e.id, e.commodity_name FROM t_yujin_crossborder_prov_commodity_category e WHERE e.commodity_name = %s"
  40. cursor.execute(sql, (f"{commodity_name}",))
  41. result = cursor.fetchone()
  42. if result:
  43. return result[0], result[1]
  44. else:
  45. return None, None
  46. else:
  47. return result[0], result[1]
  48. else:
  49. return None, None
  50. except Exception as e:
  51. print(f"查询数据库时发生异常: {str(e)}")
  52. return None, None
  53. finally:
  54. if connection:
  55. connection.close()
  56. def get_hs_all():
  57. try:
  58. # 连接数据库
  59. connection = pymysql.connect(**DB_CONFIG)
  60. with connection.cursor() as cursor:
  61. # 执行查询
  62. sql = "SELECT e.id,e.category_name FROM t_yujin_crossborder_hs_category e"
  63. cursor.execute(sql)
  64. all_records = cursor.fetchall()
  65. if all_records:
  66. return all_records
  67. else:
  68. return None
  69. except Exception as e:
  70. print(f"查询数据库时发生异常: {str(e)}")
  71. return None
  72. finally:
  73. if connection:
  74. connection.close()
  75. # 对密码进行 URL 编码
  76. encoded_password = quote_plus(DB_CONFIG["password"])
  77. # 构建 SQLAlchemy 引擎
  78. engine = create_engine(
  79. f"mysql+pymysql://{DB_CONFIG['user']}:{encoded_password}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}?charset={DB_CONFIG['charset']}",
  80. pool_size=5,
  81. max_overflow=10
  82. )
  83. def bulk_insert(sql_statements):
  84. """
  85. 批量执行 SQL 插入语句
  86. :param sql_statements: 包含多个 INSERT 语句的列表
  87. """
  88. if not sql_statements:
  89. print("未提供有效的 SQL 插入语句,跳过操作")
  90. return
  91. try:
  92. with engine.connect() as conn:
  93. with conn.begin():
  94. for sql in sql_statements:
  95. stmt = text(sql.strip())
  96. conn.execute(stmt)
  97. print(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
  98. except Exception as e:
  99. print(f"数据库操作失败: {str(e)}")
  100. raise
  101. def update_january_yoy(prov_name):
  102. """
  103. 更新指定省份1月份同比数据
  104. :param prov_name: 省份名称,默认为福建省
  105. """
  106. update_sql = text("""
  107. UPDATE t_yujin_crossborder_prov_region_trade AS curr
  108. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  109. ON curr.city_code = prev.city_code
  110. AND prev.crossborder_year_month = DATE_FORMAT(
  111. DATE_SUB(
  112. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  113. INTERVAL 1 YEAR
  114. ),
  115. '%Y-01'
  116. )
  117. SET
  118. curr.yoy_import_export = COALESCE (
  119. ROUND(
  120. (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
  121. ), 0.0000
  122. ), curr.yoy_import = COALESCE (
  123. ROUND(
  124. (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
  125. ), 0.0000
  126. ), curr.yoy_export = COALESCE (
  127. ROUND(
  128. (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
  129. ), 0.0000
  130. )
  131. WHERE
  132. curr.prov_name = :prov_name
  133. AND curr.crossborder_year_month LIKE '%-01'
  134. AND curr.crossborder_year_month
  135. > '2023-01'
  136. """)
  137. try:
  138. with engine.begin() as conn:
  139. result = conn.execute(update_sql, {'prov_name': prov_name})
  140. print(f"Updated {result.rowcount} rows for {prov_name}")
  141. return result.rowcount
  142. except Exception as e:
  143. print(f"Update failed: {str(e)}")
  144. raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
  145. def clear_old_shandong_yoy(prov_name):
  146. """
  147. 清理山东省2024年前数据的同比指标
  148. """
  149. clear_sql = text("""
  150. UPDATE t_yujin_crossborder_prov_region_trade
  151. SET yoy_import_export = 0.0000,
  152. yoy_export = 0.0000,
  153. yoy_import = 0.0000
  154. WHERE prov_name = :prov_name
  155. AND crossborder_year_month < '2024-01'
  156. AND (yoy_import_export != 0
  157. OR yoy_export != 0
  158. OR yoy_import != 0) -- 优化:仅更新非零记录
  159. """)
  160. try:
  161. with engine.begin() as conn:
  162. result = conn.execute(clear_sql, {'prov_name': prov_name})
  163. print(f"{prov_name} 旧数据清零记录数: {result.rowcount}")
  164. return result.rowcount
  165. except Exception as e:
  166. print(f"旧数据清零失败: {str(e)}")
  167. raise
  168. def update_shandong_yoy(prov_name):
  169. """
  170. 完整更新山东省同比数据(包含新旧数据处理)
  171. """
  172. try:
  173. # 步骤1:清理旧数据
  174. cleared = clear_old_shandong_yoy(prov_name)
  175. # 步骤2:计算新数据
  176. updated = _update_shandong_new_yoy(prov_name)
  177. print(f"{prov_name} 同比处理完成 | 清零:{cleared} 更新:{updated}")
  178. return {'cleared': cleared, 'updated': updated}
  179. except Exception as e:
  180. print("{prov_name} 数据处理失败", exc_info=True)
  181. raise
  182. def _update_shandong_new_yoy(prov_name):
  183. """
  184. 处理2024年及之后的山东省数据(内部方法)
  185. """
  186. update_sql = text("""UPDATE t_yujin_crossborder_prov_region_trade AS curr
  187. INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
  188. ON curr.city_code = prev.city_code
  189. AND prev.crossborder_year_month = DATE_FORMAT(
  190. DATE_SUB(
  191. STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
  192. INTERVAL 1 YEAR
  193. ),
  194. '%Y-%m'
  195. )
  196. SET
  197. curr.yoy_import_export = COALESCE (
  198. TRUNCATE((curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4),
  199. 0.0000
  200. ),
  201. curr.yoy_import = COALESCE (
  202. TRUNCATE((curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4),
  203. 0.0000
  204. ),
  205. curr.yoy_export = COALESCE (
  206. TRUNCATE((curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4),
  207. 0.0000
  208. )
  209. WHERE
  210. curr.prov_name = :prov_name
  211. AND curr.crossborder_year_month >= '2024-01'
  212. AND prev.monthly_total IS NOT NULL
  213. """)
  214. with engine.begin() as conn:
  215. result = conn.execute(update_sql, {'prov_name': prov_name})
  216. print(f"{prov_name} 新数据更新数: {result.rowcount}")
  217. return result.rowcount
  218. if __name__ == '__main__':
  219. update_january_yoy('浙江省')
  220. update_shandong_yoy('浙江省')
  221. print("同比sql处理完成")