Commit 390420e0 by jscat

titan Data

1. 测试事务成功
with self.engine.begin() as conn:
parent eb531ce4
...@@ -2,14 +2,17 @@ import pandas as pd ...@@ -2,14 +2,17 @@ import pandas as pd
import tushare as ts import tushare as ts
import baostock as bs import baostock as bs
import pymysql import pymysql
import traceback
import time import time
import calendar import calendar
from datetime import datetime from datetime import datetime
import titan_data_settings as settings import titan_data_settings as settings
from urllib import parse from urllib import parse
import logging
from sqlalchemy import create_engine from sqlalchemy import create_engine
from titan_table_structure import Base from titan_table_structure import Base
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
from sqlalchemy import text
''' '''
author: jscat 2021/02/02 author: jscat 2021/02/02
...@@ -25,25 +28,44 @@ class Data: ...@@ -25,25 +28,44 @@ class Data:
def __init__(self, token): def __init__(self, token):
# 创建请求数据线程 # 创建请求数据线程
self.token = token self.token = token
self.engine = create_engine(settings.mysql_url, encoding='utf-8', echo=False) self.engine = create_engine(settings.mysql_url, encoding='utf-8', echo=True)
# truncate数据并且append # truncate数据并且append
def truncate_append(self, data, table_name): def truncate_append(self, data, table_name, info):
"""删除mysql表所有数据,to_sql追加新数据""" """删除mysql表所有数据,to_sql追加新数据"""
conn = self.engine.connect() conn = self.engine.connect()
conn.execute('truncate ' + table_name) conn.execute('truncate ' + table_name)
print(data) print(data)
data.to_sql(table_name, self.engine, if_exists='append', index=False) data.to_sql(table_name, self.engine, if_exists='append', index=False)
self.update_log(table_name, info)
# keep数据并且append # keep数据并且append
def keep_append(self, data, table_name): def keep_append(self, data, table_name, info):
"""保留mysql表所有数据,to_sql追加新数据""" """保留mysql表所有数据,to_sql追加新数据"""
data.to_sql(table_name, self.engine, if_exists='append', index=False) try:
with self.engine.begin() as conn:
print (pd.read_sql(sql=text('select * from sharpe.tbl_AShareDescription'), con=conn))
#print (pd.read_sql(sql=text('truncate table sharpe.tbl_AShareDescription; select * from sharpe.tbl_AShareDescription;'), con=conn))
#conn.execute('truncate table sharpe.tbl_AShareDescription')
df = pd.DataFrame({'OBJECT_ID': "001", 'S_INFO_CODE': "uj"}, index=[0])
df.to_sql("tbl_AShareDescription", conn, if_exists='append', index=False)
print(pd.read_sql(sql=text('select * from sharpe.tbl_AShareDescription'), con=conn))
data.to_sql(table_name, conn, if_exists='append', index=False)
self.update_log(table_name, conn, info)
except Exception as ee:
#logging.error('keep_append fialed: '+info, ee)
self.error_log(table_name, self.engine, info, ee.args)
print(pd.read_sql(sql=text('select * from sharpe.tbl_AShareDescription'), con=self.engine))
# 同步更新日志表 # 同步更新日志表
def update_log(self, table_name, info): def update_log(self, table_name, conn, info):
data = pd.DataFrame({'TARGET_TABLE': table_name, 'UPDATE_INFO': info, 'CREATE_DT': str(datetime.now())}, index=[0]) df = pd.DataFrame({'TARGET_TABLE': table_name, 'UPDATE_INFO': info, 'CREATE_DT': str(datetime.now())}, index=[0])
data.to_sql("tbl_update_log", self.engine, if_exists='append', index=False) df.to_sql("tbl_update_log", conn, if_exists='append', index=False)
# 同步更新错误日志表
def error_log(self, table_name, conn, info, log):
df = pd.DataFrame({'TARGET_TABLE': table_name, 'UPDATE_INFO': info, 'ERROR_INFO': str(log), 'CREATE_DT': str(datetime.now())}, index=[0])
df.to_sql("tbl_error_log", conn, if_exists='append', index=False)
''' '''
start_date='20210101', end_date='20311231' start_date='20210101', end_date='20311231'
...@@ -87,13 +109,12 @@ class Data: ...@@ -87,13 +109,12 @@ class Data:
df_all['SOURCE_TYPE'] = "BS" df_all['SOURCE_TYPE'] = "BS"
df_all['S_INFO_EXCHMARKET'] = "SSE" df_all['S_INFO_EXCHMARKET'] = "SSE"
info = "update record: "+str(start_date)+"_"+str(end_date)
if type == 0: if type == 0:
self.truncate_append(df_all, 'tbl_AShareCalendar') self.truncate_append(df_all, 'tbl_AShareCalendar', info)
else: else:
self.keep_append(df_all, 'tbl_AShareCalendar') self.keep_append(df_all, 'tbl_AShareCalendar', info)
self.update_log('tbl_AShareCalendar', "update record: "+str(start_date)+"_"+str(end_date))
self.engine.dispose();
# print('{}成功导入数据库'.format(date))
# Deprecated # Deprecated
def get_all_stockdata(self, start_date, end_date): def get_all_stockdata(self, start_date, end_date):
...@@ -158,13 +179,13 @@ class Data: ...@@ -158,13 +179,13 @@ class Data:
if __name__=='__main__': if __name__=='__main__':
data = Data(settings.ts_token) data = Data(settings.ts_token)
Base.metadata.create_all(data.engine) start = "2020-02-01"
start = "2019-02-01" for i in range(1):
for i in range(11+2):
date = pd.to_datetime(start) + relativedelta(months=+i) # 当前日期往后推i个月 date = pd.to_datetime(start) + relativedelta(months=+i) # 当前日期往后推i个月
date_str = date.strftime("%Y-%m-%d") date_str = date.strftime("%Y-%m-%d")
start_date, end_date = data.get_current_month_start_and_end(date) start_date, end_date = data.get_current_month_start_and_end(date)
print(start_date, end_date) print(start_date, end_date)
data.get_AShareCalendar(start_date=start_date, end_date=end_date, type=1) data.get_AShareCalendar(start_date=start_date, end_date=end_date, type=1)
time.sleep(10) time.sleep(10)
print("finish")
...@@ -6,4 +6,9 @@ from urllib import parse ...@@ -6,4 +6,9 @@ from urllib import parse
from sqlalchemy import create_engine from sqlalchemy import create_engine
ts_token = "33261e14a0f45680d6afdc86b85bc0c4c80ba6b8dc199d7313a30838" ts_token = "33261e14a0f45680d6afdc86b85bc0c4c80ba6b8dc199d7313a30838"
mysql_url = "mysql://sharpe_dev:123456@47.99.110.89:3306/sharpe?charset=utf8" mysql_url = "mysql+pymysql://sharpe_dev:123456@47.99.110.89:3306/sharpe?charset=utf8"
\ No newline at end of file ip_address = "47.99.110.89"
ip_port = 3306
db_name = "sharpe"
user_name = "sharpe_dev"
password ="123456"
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论