Commit dbbe6cf2 by jscat

titan data 初始化代码

parent 9279a73a
# encoding=utf-8 # encoding=utf-8
import MySQLdb import MySQLdb
import time
import numpy as np
import collections
import csv
import string
import urllib
import json
import time
import MySQLdb
import datetime
import logging
import re import re
import sys, os import sys, os
import numpy as np
import pandas as pd import pandas as pd
import math import math
......
# -*- coding: utf-8 -*-
# python环境:python3.7
# apscheduler版本:3.6.1
from apscheduler.schedulers.blocking import BlockingScheduler
from threading import Thread
from datetime import datetime
from datetime import datetime, timedelta
from titan_data_function import Data
from titan_table_structure import Base
import titan_data_settings as settings
class Demo:
def __init__(self):
# 创建请求数据线程
self._thread_data = Thread(target=self.get_data)
self.go = Data(settings.ts_token)
def start(self):
"""
线程启动函数
"""
self._thread_data.start()
def get_calendar(self):
"""
获取数据
"""
self.go.get_AShareCalendar("2010-01-01", "2021-02-30", 0)
print("calendar," + str(datetime.now()))
def get_month_calendar(self):
"""
获取数据
"""
today = datetime.today()
start_date, end_date = self.go.get_current_month_start_and_end(str(today))
self.go.get_AShareCalendar(start_date=start_date, end_date=end_date, type=1)
print("calendar," + str(datetime.now()))
def get_data(self):
"""
使用apscheduler定时框架创建定时任务
"""
# 创建调度器对象
scheduler = BlockingScheduler()
# 添加定时任务 每月1号早上8:30更新当月交易日历
scheduler.add_job(self.get_month_calendar, 'cron', month='1-12', day='1', hour='8', minute='30', name="calendar")
# 启动调度器,后台监控定时任务,到点执行
scheduler.start()
if __name__== '__main__':
demo = Demo()
demo.start()
import pandas as pd
import tushare as ts
import baostock as bs
import pymysql
import calendar
import titan_data_settings as settings
from urllib import parse
from sqlalchemy import create_engine
from titan_table_structure import Base
'''
author: jscat 2021/02/02
'''
'''
初始化函数
:param token:tushare pro的token
'''
class Data:
def __init__(self, token):
# 创建请求数据线程
self.token = token
self.engine = create_engine(settings.mysql_url, encoding='utf-8', echo=True)
def truncate_append(self, data, table_name):
"""删除mysql表所有数据,to_sql追加新数据"""
conn = self.engine.connect()
conn.execute('truncate ' + table_name)
print(data)
data.to_sql(table_name, self.engine, if_exists='append', index=False)
self.engine.dispose()
def keep_append(self, data, table_name):
"""保留mysql表所有数据,to_sql追加新数据"""
data.to_sql(table_name, self.engine, if_exists='append', index=False)
self.engine.dispose()
'''
start_date='20210101', end_date='20311231'
FieldTypeComment
OBJECT_ID varchar(100) NULL对象ID
TRADE_DAYS varchar(8) NOT NULLTrading Day, 20210201
S_INFO_EXCHMARKET varchar(40) NOT NULLExchange Name (English), SSE:上海交易所 | SZSE:深圳交易所
SOURCE_TYPE varchar(10) NULLBS: baostock | WD: wind
'''
def get_AShareCalendar(self, start_date, end_date, type):
# 此方法连接数据库,密码可以输入特殊字符串
print('bs连接成功')
lg = bs.login()
# 显示登陆返回信息
print('login respond error_code:' + lg.error_code)
print('login respond error_msg:' + lg.error_msg)
#### 获取交易日信息 ####
rs = bs.query_trade_dates(start_date=start_date, end_date=end_date)
print('query_trade_dates respond error_code:' + rs.error_code)
print('query_trade_dates respond error_msg:' + rs.error_msg)
#### 打印结果集 ####
data_list = []
while (rs.error_code == '0') & rs.next():
# 获取一条记录,将记录合并在一起
data_list.append(rs.get_row_data())
df_all = pd.DataFrame(data_list, columns=rs.fields)
# 删选数据
df_all = df_all[df_all['is_trading_day'] == '1']
df_all = df_all.drop('is_trading_day', axis=1)
# 本地储存前一定要先转化成Wind-Compatible日期格式先
df_all['calendar_date'] = df_all['calendar_date'].str.replace('-', '')
# 对获取的数据列名称进行重命名适应Wind format
df_all = df_all.rename(columns={'calendar_date': 'TRADE_DAYS'})
# 补全其他字段
df_all['OBJECT_ID'] = ""
df_all['SOURCE_TYPE'] = "BS"
df_all['S_INFO_EXCHMARKET'] = "SSE"
if type == 0:
self.truncate_append(df_all, 'tbl_AShareCalendar')
else:
self.keep_append(df_all, 'tbl_AShareCalendar')
# print('{}成功导入数据库'.format(date))
# Deprecated
def get_all_stockdata(self, start_date, end_date):
# 此方法连接数据库,密码可以输入特殊字符串
engine = create_engine(settings.mysql_url)
print('数据库连接成功')
ts.set_token(self.token)
pro = ts.pro_api()
trade_d = pro.trade_cal(exchange='SSE', is_open='1',start_date=start_date,end_date=end_date, fields='cal_date')
for date in trade_d['cal_date'].values:
df_basic = pro.stock_basic(exchange='', list_status='L') #再获取所有股票的基本信息
df_daily = pro.daily(trade_date=date) # 先获得所有股票的行情数据,成交额单位是千元,成交量是手
df_daily_basic = pro.daily_basic(ts_code='', trade_date=date,fields='ts_code, turnover_rate, turnover_rate_f,'
' volume_ratio, pe, pe_ttm, pb, ps, ps_ttm,'
' dv_ratio, dv_ttm, total_share, float_share,'
' free_share, total_mv, circ_mv ') #获取每日指标,单位是万股,万元
df_first = pd.merge(left=df_basic, right=df_daily, on='ts_code', how='outer') # on='ts_code'以ts_code为索引,合并数据,how='outer',取并集
df_all = pd.merge(left=df_first, right=df_daily_basic, on='ts_code', how='outer')
# 数据清洗,删除symbol列数据,跟ts_code数据重复
df_all = df_all.drop('symbol', axis=1)
for w in ['name', 'area', 'industry', 'market']: # 在'name', 'area', 'industry', 'market'列内循环填充NaN值
df_all[w].fillna('问题股', inplace=True)
#df_all['amount'] = df_all['amount'] / 100000 # 千转亿
#df_all['circ_mv'] = df_all['circ_mv'] / 10000 # 万转亿
#df_all['total_mv'] = df_all['total_mv'] / 10000 # 万转亿
df_all['ts_code'] = df_all['ts_code'].astype(str) # 强制转换成str字符串格式
df_all['listart_date'] = pd.to_datetime(df_all['listart_date']) # 本地储存前一定要先转化成日期格式先
df_all['trade_date'] = pd.to_datetime(df_all['trade_date'])
#对获取的股票数据列名称进行重命名以方便阅读
df_all = df_all.rename(columns={'ts_code': '股票代码', 'name': '股票名称', 'area': '所在地域', 'industry': '行业'
, 'market': '市场类型', 'listart_date': '上市日期', 'trade_date': '交易日期', 'change': '涨跌额'
, 'pct_chg': '涨跌幅', 'vol': '成交量(手)', 'amount': '成交额(千元)', 'turnover_rate': '换手率(%)'
, 'turnover_rate_f': '流通换手率', 'volume_ratio': '量比', 'pe': '市盈率', 'pe_ttm': '滚动市盈率'
, 'pb': '市净率', 'ps': '市销率', 'ps_ttm': '滚动市销率', 'dv_ratio': '股息率'
, 'dv_ttm': '滚动股息率', 'total_share': '总股本(万股)', 'float_share': '流通股本 (万股)'
, 'free_share': '自由流通股本(万股)', 'total_mv': '总市值 (万元)', 'circ_mv': '流通市值(万元)'})
#亏损的为空值
engine.execute('drop table if exists {}_ts;'.format(date)) #删除重复的数据表
print('%s is downloading....' % (str(date)))
df_all.to_sql('{}_ts'.format(date),engine,index=False)
print('{}成功导入数据库'.format(date))
# 获取当月起止日期
def get_current_month_start_and_end(self, date):
"""
年份 date(2017-09-08格式)
:param date:
:return:本月第一天日期和本月最后一天日期
"""
if date.count('-') != 2:
raise ValueError('- is error')
year, month = str(date).split('-')[0], str(date).split('-')[1]
end = calendar.monthrange(int(year), int(month))[1]
start_date = '%s-%s-01' % (year, month)
end_date = '%s-%s-%s' % (year, month, end)
return start_date, end_date
if __name__=='__main__':
data = Data(settings.ts_token)
Base.metadata.create_all(data.engine)
data.get_AShareCalendar("2015-01-01", "2015-12-31", 0)
data.get_AShareCalendar("2016-01-01", "2016-12-31", 0)
data.get_AShareCalendar("2017-01-01", "2017-12-31", 0)
data.get_AShareCalendar("2018-01-01", "2018-12-31", 0)
data.get_AShareCalendar("2019-01-01", "2019-12-31", 0)
data.get_AShareCalendar("2020-01-01", "2020-12-31", 0)
import pandas as pd
import tushare as ts
import baostock as bs
import pymysql
from urllib import parse
from sqlalchemy import create_engine
ts_token = "33261e14a0f45680d6afdc86b85bc0c4c80ba6b8dc199d7313a30838"
mysql_url = "mysql://sharpe_dev:123456@47.99.110.89:3306/sharpe?charset=utf8"
\ No newline at end of file
# coding=utf-8
from sqlalchemy import Column, String, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
import titan_data_settings as settings
# 创建连接
engine = create_engine(settings.mysql_url, encoding='utf-8', echo=True)
# 生成orm基类
Base = declarative_base()
class AShareCalendar(Base):
"""交易日历
"""
__tablename__ = 'tbl_AShareCalendar'
OBJECT_ID = Column(String(100), comment="对象ID") # 对象ID
TRADE_DAYS = Column(String(8), primary_key=True, comment="Trading Day, 20210201") # Trading Day, 20210201
S_INFO_EXCHMARKET = Column(String(40), primary_key=True, comment="Exchange Name (English), SSE:上海交易所 | SZSE:深圳交易所") # Exchange Name (English), SSE:上海交易所 | SZSE:深圳交易所
SOURCE_TYPE = Column(String(10), comment="BS: baostock | WD: wind") # BS: baostock | WD: wind
#
Base.metadata.create_all(engine)
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论