from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.mysql import insert
from sqlalchemy import create_engine
from sqlalchemy import update
from sqlalchemy.orm import sessionmaker
def signature(pd_table, conn, keys, data_iter):
Base = declarative_base()
Base.metadata.reflect(conn)
table = Base.metadata.tables[pd_table.name]
session_db = sessionmaker(bind=conn)
session = session_db()
for item in data_iter:
data = dict(zip(keys, item))
insert_stmt = insert(table).values(
**data).on_duplicate_key_update(**data)
session.execute(insert_stmt)
session.commit()
session.close()
def update_pe_from_csv(pd_table, conn, keys, data_iter):
Base = declarative_base()
Base.metadata.reflect(conn)
table = Base.metadata.tables[pd_table.name]
session_db = sessionmaker(bind=conn)
session = session_db()
for item in data_iter:
data = dict(zip(keys, item))
update_stmt = update(table).where(
table.c.symbol == data['symbol']).values(**data)
session.execute(update_stmt)
session.commit()
session.close()
def update_pe(pd_table, conn, keys, data_iter):
Base = declarative_base()
Base.metadata.reflect(conn)
table = Base.metadata.tables[pd_table.name]
session_db = sessionmaker(bind=conn)
session = session_db()
for item in data_iter:
data = dict(zip(keys, item))
update_stmt = update(table).where(
table.c.ts_code == data['ts_code']).values(**data)
session.execute(update_stmt)
session.commit()
session.close()
def insert_update_from_http(pd_table, conn, results):
Base = declarative_base()
Base.metadata.reflect(conn)
table = Base.metadata.tables[pd_table]
session_db = sessionmaker(bind=conn)
session = session_db()
for data in results:
insert_stmt = insert(table).values(
**data).on_duplicate_key_update(**data)
session.execute(insert_stmt)
session.commit()
session.close()
engine = create_engine(
'mysql+pymysql://wangchenxi:123456@localhost:3306/tusharepro',
echo=False,
pool_size=30,
pool_recycle=4,
pool_pre_ping=True)
版权声明:除特别注明外,本站所有文章均为王晨曦个人站点原创
转载请注明:出处来自王晨曦个人站点 » sqlalchemy代码举例