Commit be597e4d by jscat

titan data

1. 新增A股资料数据
2. 新增scheduler接口
parent d54240e1
......@@ -33,25 +33,39 @@ class Demo:
def get_month_calendar(self):
"""
获取数据
获取Calendar数据
"""
today = datetime.today()
start_date, end_date = self.go.get_current_month_start_and_end(str(today))
self.go.get_AShareCalendar(start_date=start_date, end_date=end_date, type=1)
print("calendar," + str(datetime.now()))
def get_description(self):
"""
获取Description数据
"""
date = datetime.now()
self.go.get_AShareDescription(date=date, type=0)
print("description," + str(datetime.now()))
def get_data(self):
"""
使用apscheduler定时框架创建定时任务
"""
# 创建调度器对象
scheduler = BlockingScheduler()
# 添加定时任务 每月1号早上8:30更新当月交易日历
# 添加定时任务 每月1号早上8:30 更新当月交易日历
scheduler.add_job(self.get_month_calendar, 'cron', month='1-12', day='1', hour='8', minute='30', name="calendar")
# 添加定时任务 08:00; 09:00;12:30 更新当天A股资料
scheduler.add_job(self.get_description, 'cron', day_of_week='mon-fri', hour=8, minute=0, name="description")
scheduler.add_job(self.get_description, 'cron', day_of_week='mon-fri', hour=9, minute=0, name="description")
scheduler.add_job(self.get_description, 'cron', day_of_week='mon-fri', hour=12, minute=30, name="description")
# 启动调度器,后台监控定时任务,到点执行
scheduler.start()
if __name__== '__main__':
print("start")
demo = Demo()
demo.start()
......@@ -13,6 +13,7 @@ from sqlalchemy import create_engine
from titan_table_structure import Base
from dateutil.relativedelta import relativedelta
from sqlalchemy import text
import re
'''
author: jscat 2021/02/02
......@@ -28,40 +29,32 @@ class Data:
def __init__(self, token):
# 创建请求数据线程
self.token = token
self.engine = create_engine(settings.mysql_url, encoding='utf-8', echo=True)
self.engine = create_engine(settings.mysql_url, encoding='utf-8', echo=False)
# truncate数据并且append
def truncate_append(self, data, table_name, info):
# delete数据并且append
def delete_append(self, data, table_name, info):
"""删除mysql表所有数据,to_sql追加新数据"""
conn = self.engine.connect()
conn.execute('truncate ' + table_name)
print(data)
data.to_sql(table_name, self.engine, if_exists='append', index=False)
self.update_log(table_name, info)
try:
with self.engine.begin() as conn:
conn.execute('DELETE FROM ' + table_name)
# rollback sql
data.to_sql(table_name, conn, if_exists='append', index=False)
self.update_log(table_name, conn, info)
except Exception as ee:
logging.error('delete_append failed: '+info, ee)
self.error_log(table_name, self.engine, info, ee.args)
# keep数据并且append
def keep_append(self, data, table_name, info):
"""保留mysql表所有数据,to_sql追加新数据"""
try:
with self.engine.begin() as conn:
print (pd.read_sql(sql=text('select * from sharpe.tbl_AShareDescription'), con=conn))
# test1: rollback-failure
# conn.execute('TRUNCATE TABLE sharpe.tbl_AShareDescription')
# test2: rollback-support
conn.execute('DELETE FROM sharpe.tbl_AShareDescription')
# test3: rollback-support
#conn.execute('REPLACE INTO sharpe.tbl_AShareDescription(S_INFO_CODE) values("123")')
# test4: rollback-support
# df = pd.DataFrame({'OBJECT_ID': "001", 'S_INFO_CODE': "uj"}, index=[0])
# df.to_sql("tbl_AShareDescription", conn, if_exists='append', index=False)
print(pd.read_sql(sql=text('select * from sharpe.tbl_AShareDescription'), con=conn))
# rollback sql
data.to_sql(table_name, conn, if_exists='append', index=False)
self.update_log(table_name, conn, info)
except Exception as ee:
#logging.error('keep_append fialed: '+info, ee)
logging.error('keep_append failed: '+info, ee)
self.error_log(table_name, self.engine, info, ee.args)
print(pd.read_sql(sql=text('select * from sharpe.tbl_AShareDescription'), con=self.engine))
# 同步更新日志表
def update_log(self, table_name, conn, info):
......@@ -76,11 +69,11 @@ class Data:
'''
start_date='20210101', end_date='20311231'
FieldTypeComment
FieldType Comment
OBJECT_ID varchar(100) NULL对象ID
TRADE_DAYS varchar(8) NOT NULLTrading Day, 20210201
S_INFO_EXCHMARKET varchar(40) NOT NULLExchange Name (English), SSE:上海交易所 | SZSE:深圳交易所
SOURCE_TYPE varchar(10) NULLBS: baostock | WD: wind
TRADE_DAYS varchar(8) NOT NULL Trading Day, 20210201
S_INFO_EXCHMARKET varchar(40) NOT NULL Exchange Name (English), SSE:上海交易所 | SZSE:深圳交易所
SOURCE_TYPE varchar(10) NULL BS: baostock | WD: wind
'''
def get_AShareCalendar(self, start_date, end_date, type):
# 此方法连接数据库,密码可以输入特殊字符串
......@@ -115,12 +108,64 @@ class Data:
df_all['SOURCE_TYPE'] = "BS"
df_all['S_INFO_EXCHMARKET'] = "SSE"
# data operation
info = "update record: "+str(start_date)+"_"+str(end_date)
if type == 0:
self.truncate_append(df_all, 'tbl_AShareCalendar', info)
self.delete_append(df_all, 'tbl_AShareCalendar', info)
else:
self.keep_append(df_all, 'tbl_AShareCalendar', info)
'''
start_date='20210101', end_date='20311231'
OBJECT_ID varchar(100) NULL对象ID
S_INFO_CODE varchar(40) NOT NULL交易代码
S_INFO_NAME varchar(50) NULL 证券简称
S_INFO_COMPNAME varchar(100) NULL 公司中文名字
S_INFO_COMPNAMEENG varchar(100) NULL 公司英文名字
S_INFO_ISINCODE varchar(40) NULL ISIN CODE
S_INFO_EXCHMARKET varchar(40) NULL 交易所, SSE: 上交所; SZSE:深交所
S_INFO_LISTBOARD varchar(10) NULL 上市板类型; 434004000:主板; 434003000:中小企业板; 434001000:创业板
S_INFO_LISTBOARDNAME varchar(10) NULL 上市板, 主板, 创业板, 中小企业板
'''
def get_AShareDescription(self, date, type):
# 登陆系统
lg = bs.login()
# 显示登陆返回信息
print('login respond error_code:' + lg.error_code)
print('login respond error_msg:' + lg.error_msg)
# 获取证券基本资料
rs = bs.query_stock_basic()
# rs = bs.query_stock_basic(code_name="浦发银行") # 支持模糊查询
print('query_stock_basic respond error_code:' + rs.error_code)
print('query_stock_basic respond error_msg:' + rs.error_msg)
# 打印结果集
data_list = []
while (rs.error_code == '0') & rs.next():
# 获取一条记录,将记录合并在一起
data_list.append(rs.get_row_data())
df = pd.DataFrame(data_list, columns=rs.fields)
# 结果集输出到csv文件
# 筛选出type=='1'(股票)和status=='1'(可用)的数据
print(len(df))
df = df[(df['type'] == '1') & (df['status'] == '1')]
print(len(df))
#
df['S_INFO_CODE'] = df['code'].apply(lambda x: x.split('.')[1])
df['S_INFO_EXCHMARKET'] = df['code'].apply(lambda x: x.split('.')[0])
df['S_INFO_EXCHMARKET'] = df['S_INFO_EXCHMARKET'].map(lambda x: re.sub('sh', 'SSE', x))
df['S_INFO_EXCHMARKET'] = df['S_INFO_EXCHMARKET'].map(lambda x: re.sub('sz', 'SZSE', x))
df['S_INFO_NAME'] = df['code_name']
df = df.drop(['code', 'code_name', 'ipoDate', 'outDate', 'type', 'status'], axis=1)
# data operation
info = "update record: "+str(date)
if type == 0:
self.delete_append(df, 'tbl_AShareDescription', info)
else:
self.keep_append(df, 'tbl_AShareDescription', info)
# Deprecated
def get_all_stockdata(self, start_date, end_date):
......@@ -183,7 +228,7 @@ class Data:
end_date = '%s-%s-%s' % (year, month, end)
return start_date, end_date
if __name__=='__main__':
if __name__=='__test__':
data = Data(settings.ts_token)
start = "2020-02-01"
for i in range(1):
......@@ -195,3 +240,8 @@ if __name__=='__main__':
time.sleep(10)
print("finish")
if __name__=='__main__':
data = Data(settings.ts_token)
date = datetime.now()
data.get_AShareDescription(date=date, type=0)
print("finish")
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论