|
@@ -1,10 +1,10 @@
|
|
|
from urllib.parse import quote_plus
|
|
|
|
|
|
import pymysql
|
|
|
-from sqlalchemy import create_engine, text
|
|
|
+from sqlalchemy import text, create_engine
|
|
|
|
|
|
-from crossborder.utils.log import log
|
|
|
from crossborder.utils.crypto_utils import AESCryptor
|
|
|
+from crossborder.utils.log import log
|
|
|
|
|
|
provinces = [
|
|
|
"北京市", "天津市", "上海市", "重庆市",
|
|
@@ -42,6 +42,26 @@ def get_decrypted_password():
|
|
|
raise
|
|
|
return encrypted_pass
|
|
|
|
|
|
+
|
|
|
+# 在 base_mysql.py 模块加载时自动完成解密
|
|
|
+def initialize_engine():
|
|
|
+ """初始化数据库引擎(包含密码解密)"""
|
|
|
+ db_config = DB_CONFIG.copy()
|
|
|
+ db_config['password'] = get_decrypted_password()
|
|
|
+
|
|
|
+ # 对密码进行 URL 编码
|
|
|
+ encoded_password = quote_plus(db_config["password"])
|
|
|
+
|
|
|
+ # 构建 SQLAlchemy 引擎
|
|
|
+ return create_engine(
|
|
|
+ f"mysql+pymysql://{db_config['user']}:{encoded_password}@{db_config['host']}:{db_config['port']}/{db_config['database']}?charset={db_config['charset']}",
|
|
|
+ pool_size=5,
|
|
|
+ max_overflow=10
|
|
|
+ )
|
|
|
+
|
|
|
+# 全局引擎实例
|
|
|
+engine = initialize_engine()
|
|
|
+
|
|
|
def get_commodity_id(commodity_name):
|
|
|
"""根据商品名称查询数据库,获取商品 ID 和商品名称"""
|
|
|
fix_commodity_name = commodity_name
|
|
@@ -143,83 +163,16 @@ def bulk_insert(sql_statements):
|
|
|
log.info("未提供有效的 SQL 插入语句,跳过操作")
|
|
|
return
|
|
|
|
|
|
- connection = None
|
|
|
try:
|
|
|
- # 使用解密后的密码创建连接
|
|
|
- db_config = DB_CONFIG.copy()
|
|
|
- db_config['password'] = get_decrypted_password()
|
|
|
-
|
|
|
- # 创建连接并开启事务
|
|
|
- connection = pymysql.connect(**db_config)
|
|
|
- connection.begin() # 显式开始事务
|
|
|
-
|
|
|
- with connection.cursor() as cursor:
|
|
|
- # 遍历执行所有 SQL 语句
|
|
|
- for sql in sql_statements:
|
|
|
- # 移除 SQL 两端空白并执行
|
|
|
- cursor.execute(sql.strip())
|
|
|
-
|
|
|
- # 提交事务
|
|
|
- connection.commit()
|
|
|
- log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
|
|
|
-
|
|
|
+ with engine.connect() as conn:
|
|
|
+ with conn.begin():
|
|
|
+ for sql in sql_statements:
|
|
|
+ stmt = text(sql.strip())
|
|
|
+ conn.execute(stmt)
|
|
|
+ log.info(f"成功执行 {len(sql_statements)} 条 SQL 插入语句")
|
|
|
except Exception as e:
|
|
|
- # 回滚事务并记录错误
|
|
|
- if connection:
|
|
|
- connection.rollback()
|
|
|
log.info(f"数据库操作失败: {str(e)}")
|
|
|
raise
|
|
|
- finally:
|
|
|
- # 确保连接关闭
|
|
|
- if connection:
|
|
|
- connection.close()
|
|
|
-
|
|
|
-def update_january_yoy(prov_name):
|
|
|
- """
|
|
|
- 更新指定省份1月份同比数据
|
|
|
- :param prov_name: 省份名称,默认为福建省
|
|
|
- """
|
|
|
- update_sql = text("""
|
|
|
- UPDATE t_yujin_crossborder_prov_region_trade AS curr
|
|
|
- INNER JOIN t_yujin_crossborder_prov_region_trade AS prev
|
|
|
- ON curr.city_code = prev.city_code
|
|
|
- AND prev.crossborder_year_month = DATE_FORMAT(
|
|
|
- DATE_SUB(
|
|
|
- STR_TO_DATE(CONCAT(curr.crossborder_year_month, '-01'), '%Y-%m-%d'),
|
|
|
- INTERVAL 1 YEAR
|
|
|
- ),
|
|
|
- '%Y-01'
|
|
|
- )
|
|
|
- SET
|
|
|
- curr.yoy_import_export = COALESCE (
|
|
|
- ROUND(
|
|
|
- (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
|
|
|
- ), 0.0000
|
|
|
- ), curr.yoy_import = COALESCE (
|
|
|
- ROUND(
|
|
|
- (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
|
|
|
- ), 0.0000
|
|
|
- ), curr.yoy_export = COALESCE (
|
|
|
- ROUND(
|
|
|
- (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
|
|
|
- ), 0.0000
|
|
|
- )
|
|
|
- WHERE
|
|
|
- curr.prov_name = :prov_name
|
|
|
- AND curr.crossborder_year_month LIKE '%-01'
|
|
|
- AND curr.crossborder_year_month
|
|
|
- > '2023-01'
|
|
|
- """)
|
|
|
-
|
|
|
- try:
|
|
|
- with engine.begin() as conn:
|
|
|
- result = conn.execute(update_sql, {'prov_name': prov_name})
|
|
|
- log.info(f"Updated {result.rowcount} rows for {prov_name}")
|
|
|
- return result.rowcount
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- log.info(f"Update failed: {str(e)}")
|
|
|
- raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
|
|
|
|
|
|
def clear_old_shandong_yoy(prov_name):
|
|
|
"""
|
|
@@ -301,53 +254,6 @@ def _update_shandong_new_yoy(prov_name):
|
|
|
log.info(f"{prov_name} 新数据更新数: {result.rowcount}")
|
|
|
return result.rowcount
|
|
|
|
|
|
-def update_january_yoy_origin(region_name):
|
|
|
- """
|
|
|
- 更新指定省份1月份同比数据
|
|
|
- :param region_name: 省份名称,默认为福建省
|
|
|
- """
|
|
|
- update_sql = text("""
|
|
|
- UPDATE t_yujin_crossborder_region_trade AS curr
|
|
|
- INNER JOIN t_yujin_crossborder_region_trade AS prev
|
|
|
- ON curr.region_code = prev.region_code
|
|
|
- AND prev.year_month = DATE_FORMAT(
|
|
|
- DATE_SUB(
|
|
|
- STR_TO_DATE(CONCAT(curr.year_month, '-01'), '%Y-%m-%d'),
|
|
|
- INTERVAL 1 YEAR
|
|
|
- ),
|
|
|
- '%Y-01'
|
|
|
- )
|
|
|
- SET
|
|
|
- curr.ytd_total = COALESCE (
|
|
|
- ROUND(
|
|
|
- (curr.monthly_total - prev.monthly_total) / NULLIF (prev.monthly_total, 0) * 100, 4
|
|
|
- ), 0.0000
|
|
|
- ), curr.ytd_import = COALESCE (
|
|
|
- ROUND(
|
|
|
- (curr.monthly_import - prev.monthly_import) / NULLIF (prev.monthly_import, 0) * 100, 4
|
|
|
- ), 0.0000
|
|
|
- ), curr.ytd_export = COALESCE (
|
|
|
- ROUND(
|
|
|
- (curr.monthly_export - prev.monthly_export) / NULLIF (prev.monthly_export, 0) * 100, 4
|
|
|
- ), 0.0000
|
|
|
- )
|
|
|
- WHERE
|
|
|
- curr.region_name = :region_name
|
|
|
- AND curr.year_month LIKE '%-01'
|
|
|
- AND curr.year_month
|
|
|
- > '2023-01'
|
|
|
- """)
|
|
|
-
|
|
|
- try:
|
|
|
- with engine.begin() as conn:
|
|
|
- result = conn.execute(update_sql, {'region_name': region_name})
|
|
|
- log.info(f"Updated {result.rowcount} rows for {region_name}")
|
|
|
- return result.rowcount
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- log.info(f"Update failed: {str(e)}")
|
|
|
- raise RuntimeError(f"同比数据更新失败: {str(e)}") from e
|
|
|
-
|
|
|
def clear_old_shandong_yoy_origin(region_name):
|
|
|
"""
|
|
|
清理山东省2024年前数据的同比指标
|
|
@@ -436,12 +342,10 @@ if __name__ == '__main__':
|
|
|
# print(count)
|
|
|
|
|
|
# 新表更新地级市同比
|
|
|
- # for province in provinces:
|
|
|
- # update_january_yoy(province)
|
|
|
- # update_shandong_yoy(province)
|
|
|
+ for province in provinces:
|
|
|
+ update_shandong_yoy(province)
|
|
|
|
|
|
# 旧表更新省份同比
|
|
|
# for province in provinces:
|
|
|
- # update_january_yoy_origin(province)
|
|
|
# update_shandong_yoy_origin(province)
|
|
|
log.info("同比sql处理完成")
|