#!/usr/bin/env python # coding: utf-8 # ### Table of Conetents # - [Example Starts Here](#example_starts_here) # - [init_db()](#init_db) # - [insert_hist_data(), query_hist_data()](#insert_query_hist_data) # - [download_insert_hist_data()](#download_insert_hist_data) # - [get_hist_data()](#get_hist_data) # In[1]: # Prepare environment import os, sys sys.path.insert(0, os.path.abspath('..')) from IPython.core.interactiveshell import InteractiveShell InteractiveShell.ast_node_interactivity = "all" from io import StringIO import pandas as pd import warnings warnings.filterwarnings("ignore") # In[2]: gs1h_csv = StringIO(""" Symbol,DataType,BarSize,TickerTime,opening,high,low,closing,volume,barcount,average GS,TRADES,1h,2017-09-05 12:00:00+00:00,224.5,224.5,223.98,223.98,23,18,224.302 GS,TRADES,1h,2017-09-05 13:00:00+00:00,224.25,224.25,220.01,220.39,6431,3782,221.423 GS,TRADES,1h,2017-09-05 14:00:00+00:00,220.39,220.7,217.3,218.12,11332,5881,218.82 GS,TRADES,1h,2017-09-05 15:00:00+00:00,218.09,219.64,218.07,219.45,6457,3843,218.795 GS,TRADES,1h,2017-09-05 16:00:00+00:00,219.45,219.46,218.11,218.67,4940,3550,218.633 GS,TRADES,1h,2017-09-05 17:00:00+00:00,218.72,219.19,218.13,218.73,3228,2527,218.657 GS,TRADES,1h,2017-09-05 18:00:00+00:00,218.72,218.86,217.62,217.67,4939,3219,218.285 GS,TRADES,1h,2017-09-05 19:00:00+00:00,217.68,218.3,217.46,217.85,8173,5594,217.747 GS,TRADES,1h,2017-09-05 20:00:00+00:00,217.79,218.85,217.78,217.92,2445,10,217.781 GS,TRADES,1h,2017-09-05 21:00:00+00:00,218.0,218.11,217.91,218.11,8,5,218.022 GS,TRADES,1h,2017-09-05 22:00:00+00:00,218.11,218.17,217.95,217.95,15,11,218.12 GS,TRADES,1h,2017-09-05 23:00:00+00:00,218.15,218.15,217.93,217.93,2,2,218.04 GS,TRADES,1h,2017-09-06 12:00:00+00:00,218.97,219.29,218.97,219.2,28,11,219.159 GS,TRADES,1h,2017-09-06 13:00:00+00:00,219.1,220.78,218.67,219.58,4729,2596,219.796 GS,TRADES,1h,2017-09-06 14:00:00+00:00,219.61,221.02,219.54,220.01,4451,2722,220.49 GS,TRADES,1h,2017-09-06 15:00:00+00:00,219.98,220.2,217.73,218.2,4222,2500,219.02 GS,TRADES,1h,2017-09-06 16:00:00+00:00,218.2,219.83,217.61,219.8,2680,1809,218.335 GS,TRADES,1h,2017-09-06 17:00:00+00:00,219.77,220.5,219.41,219.57,2470,1492,219.954 GS,TRADES,1h,2017-09-06 18:00:00+00:00,219.61,219.8,218.9,219.33,2127,1447,219.428 GS,TRADES,1h,2017-09-06 19:00:00+00:00,219.33,219.7,218.85,219.05,5587,3451,219.363 GS,TRADES,1h,2017-09-06 20:00:00+00:00,218.83,219.09,218.83,218.99,3634,6,218.83 GS,TRADES,1h,2017-09-06 21:00:00+00:00,218.98,218.98,218.98,218.98,2,1,218.98 GS,TRADES,1h,2017-09-06 22:00:00+00:00,218.7,218.7,218.7,218.7,1,1,218.7 GS,TRADES,1h,2017-09-06 23:00:00+00:00,218.69,218.7,218.69,218.7,8,2,218.696 GS,TRADES,1h,2017-09-07 11:00:00+00:00,219.0,219.0,219.0,219.0,1,1,219.0 GS,TRADES,1h,2017-09-07 12:00:00+00:00,219.21,219.4,218.5,218.5,31,16,219.015 GS,TRADES,1h,2017-09-07 13:00:00+00:00,218.57,218.83,216.07,216.31,3338,1726,217.503 GS,TRADES,1h,2017-09-07 14:00:00+00:00,216.35,216.35,214.64,215.77,7048,4299,215.392 GS,TRADES,1h,2017-09-07 15:00:00+00:00,215.74,216.41,214.96,215.28,4571,3190,215.666 GS,TRADES,1h,2017-09-07 16:00:00+00:00,215.24,216.28,215.06,216.07,2191,1541,215.518 GS,TRADES,1h,2017-09-07 17:00:00+00:00,216.04,216.41,215.26,215.6,2058,1495,215.708 GS,TRADES,1h,2017-09-07 18:00:00+00:00,215.58,215.74,215.25,215.37,2206,1509,215.428 GS,TRADES,1h,2017-09-07 19:00:00+00:00,215.4,215.94,214.95,215.83,6582,4149,215.313 GS,TRADES,1h,2017-09-07 20:00:00+00:00,215.84,216.88,215.8,216.02,1869,9,215.846 GS,TRADES,1h,2017-09-07 21:00:00+00:00,215.98,215.98,215.9,215.9,9,6,215.927 GS,TRADES,1h,2017-09-07 22:00:00+00:00,215.9,215.99,215.9,215.99,14,3,215.909 GS,TRADES,1h,2017-09-07 23:00:00+00:00,216.09,216.09,216.09,216.09,1,1,216.09 GS,TRADES,1h,2017-09-08 11:00:00+00:00,215.44,215.44,215.44,215.44,1,1,215.44 GS,TRADES,1h,2017-09-08 12:00:00+00:00,214.9,215.5,214.8,215.5,22,8,215.05 GS,TRADES,1h,2017-09-08 13:00:00+00:00,215.5,218.76,215.1,218.67,5788,3250,217.577 GS,TRADES,1h,2017-09-08 14:00:00+00:00,218.68,219.28,217.78,218.04,4696,3283,218.707 GS,TRADES,1h,2017-09-08 15:00:00+00:00,218.06,218.39,216.82,216.89,2574,1880,217.345 GS,TRADES,1h,2017-09-08 16:00:00+00:00,216.92,217.3,216.67,217.14,2049,1433,217.015 GS,TRADES,1h,2017-09-08 17:00:00+00:00,217.12,217.17,216.15,216.76,2254,1565,216.591 GS,TRADES,1h,2017-09-08 18:00:00+00:00,216.76,217.35,216.61,217.01,1921,1373,217.117 GS,TRADES,1h,2017-09-08 19:00:00+00:00,217.01,217.34,216.69,217.24,3980,2789,217.075 GS,TRADES,1h,2017-09-08 20:00:00+00:00,217.21,217.21,216.71,216.71,2222,6,217.209 GS,TRADES,1h,2017-09-08 21:00:00+00:00,217.0,217.0,217.0,217.0,2,1,217.0 GS,TRADES,1h,2017-09-08 22:00:00+00:00,216.46,216.8,216.46,216.8,3,2,216.698 GS,TRADES,1h,2017-09-08 23:00:00+00:00,216.8,216.8,216.8,216.8,0,0,216.8 """) gs1h = pd.read_csv(gs1h_csv) # # # ## Example starts here # ------ # In[3]: import pytz import asyncio import aiomysql.sa as aiosa from sqlalchemy import create_engine, MetaData from ibstract import IB from ibstract import MarketDataBlock from ibstract import HistDataReq from ibstract import init_db, insert_hist_data, query_hist_data from ibstract import download_insert_hist_data from ibstract import get_hist_data from ibstract.utils import dtest # # ### `init_db()` : Initialize MySQL database by creating one table for each security type. # In[4]: db_info = { 'host': '127.0.0.1', 'user': 'root', 'password': 'ibstract', 'db': 'ibstract_test', } init_db(db_info) # In[5]: engine = create_engine( "mysql+pymysql://{}:{}@{}/{}".format( db_info['user'], db_info['password'], db_info['host'], db_info['db']), echo=False) meta = MetaData() meta.reflect(bind=engine) print("Tables in Ibstract MySQL database:") list(meta.tables.keys()) print("Columns in Stock table:") meta.tables['Stock'].columns.values() # # ### Coroutines `insert_hist_data()`, `query_hist_data()` : Insert / Read MarketDataBlock() to/from MySQL database # In[6]: # The MarketDataBlock() to be inserted. blk_gs1h = MarketDataBlock(gs1h) blk_gs1h.tz_convert('US/Eastern') blk_gs1h.df.head() # In[7]: async def run(loop, blk): engine = await aiosa.create_engine( user=db_info['user'], db=db_info['db'], host=db_info['host'], password=db_info['password'], loop=loop, echo=False) await insert_hist_data(engine, 'Stock', blk) data = await query_hist_data(engine, 'Stock', 'GS', 'TRADES', '1h', dtest(2017, 9, 6, 10), dtest(2017, 9, 6, 14),) engine.close() await engine.wait_closed() return data loop = asyncio.get_event_loop() blk_readback = loop.run_until_complete(run(loop, blk_gs1h)) blk_readback.df # # ### Coroutine `download_insert_hist_data()` : Download historical data and insert part of them to MySQL database # In[8]: # Clear Stock table engine.execute(meta.tables['Stock'].delete()) # A user coroutine to download historical data and insert to MySQL database async def run(loop, req, broker, insert_limit): engine = await aiosa.create_engine( user=db_info['user'], db=db_info['db'], host=db_info['host'], password=db_info['password'], loop=loop, echo=False) blk_download = await download_insert_hist_data( req, broker, engine, insert_limit) blk_readback = await query_hist_data( engine, req.SecType, req.Symbol, req.DataType, req.BarSize, start=dtest(2017, 8, 1), end=dtest(2017, 10, 1)) engine.close() await engine.wait_closed() return blk_download, blk_readback # Arguments req = HistDataReq('Stock', 'GS', '1d', '5d', dtest(2017, 9, 7)) broker = IB('127.0.0.1', 4002) insert_limit = (dtest(2017, 8, 31), dtest(2017, 9, 5)) # Only insert partial data between (SQL inclusive) 8/31 and 9/5 # Run loop loop = asyncio.get_event_loop() blk_download, blk_readback = loop.run_until_complete( run(loop, req, broker, insert_limit)) blk_download.df blk_readback.df # # ### Coroutine `get_hist_data()` : Try to query data from local MySQL database. If any part is not found, download and insert back to MySQL all missing parts concurrently and asynchronously. # # # #### `get_hist_data()` executes in these steps: # 1. Try to query historical data from local MySQL for the input user request. # 2. Determine missing data parts and corresponding start-end date/time gaps. Create multiple HistDataReq() requests for these gaps. # 3. Concurrently download these requests from broker API. # 4. Concurrently combine downloaded data pieces with the data from local MySQL data block. # 5. Return the combined data per the input user request. # In[9]: # Use data above blk_gs_3days = blk_readback # Clear Stock table engine.execute(meta.tables['Stock'].delete()) # Populate database with some data async def populate_db(blk_db): engine = await aiosa.create_engine( user=db_info['user'], db=db_info['db'], host=db_info['host'], password=db_info['password'], loop=loop, echo=False) await insert_hist_data(engine, 'Stock', blk_db) data_exist_in_db = await query_hist_data(engine, 'Stock', 'GS', 'TRADES', '1d', dtest(2017,1,1),dtest(2017,12,31)) engine.close() await engine.wait_closed() return data_exist_in_db # Insert and query local MySQL database loop = asyncio.get_event_loop() blk_db = loop.run_until_complete(populate_db(blk_gs_3days)) blk_db.df # In[10]: # A user coroutine to get wider range of historical data than those existing in MySQL. # Data existing in MySQL will not be downloaded, but combined with downloaded data. async def run(req, broker, loop): blk_ret = await get_hist_data(req, broker, mysql={**db_info, 'loop': loop}) return blk_ret # Request daily data of 8 days, from 8/29 - 9/8. # Data from 8/31 - 9/5 exist in local database and will not be downloaded. req = HistDataReq('Stock', 'GS', '1d', '8d', dtest(2017, 9, 9)) broker = IB('127.0.0.1', 4002) loop = asyncio.get_event_loop() blk_ret = loop.run_until_complete(run(req, broker, loop)) blk_ret.df