1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| from sqlalchemy import create_engine import pandas as pd import zipfile import os
def PairToDB(pair, data_path, DB_path): print('Start to read source data of', pair) df = pd.read_table(data_path) print('Done! Start to perpare M1.') df = df['<TICKER>,<DTYYYYMMDD>,<TIME>,<OPEN>,<HIGH>,<LOW>,<CLOSE>,<VOL>'].str.split(',', expand=True) df.columns = ['TICKER', 'YMD', 'HMS', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOL'] df['DATETIME'] = pd.to_datetime(df['YMD'] + df['HMS'], format='%Y%m%d%H%M%S') M1 = pd.DataFrame() M1['OPEN'] = df['OPEN'].astype(float) M1['HIGH'] = df['HIGH'].astype(float) M1['LOW'] = df['LOW'].astype(float) M1['CLOSE'] = df['CLOSE'].astype(float) M1['VOL'] = df['VOL'].astype(int) M1 = M1.set_index(df['DATETIME'])
engine = create_engine(DB_path, echo=False) print('Start to change TF and import', pair) M1.to_sql('M1', engine, if_exists='replace') print('M1 finish!') CHANGE_DICT = {'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last', 'VOL': 'sum'} M1.resample('5T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M5', engine, if_exists='replace') print('M5 finish!') M1.resample('15T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M15', engine, if_exists='replace') print('M15 finish!') M1.resample('30T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M30', engine, if_exists='replace') print('M30 finish!') M1.resample('60T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('H1', engine, if_exists='replace') print('H1 finish!') M1.resample('240T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('H4', engine, if_exists='replace') print('H4 finish!') M1.resample('1440T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('D1', engine, if_exists='replace') print('D1 finish!') return True
if __name__ == '__main__': workfolder = "C:\\Users\\jiawei\\Desktop" DBfolder = "D:\\Forex"
file_list = os.listdir(workfolder) for file_name in file_list: if os.path.splitext(file_name)[1] == '.zip': file_zip = zipfile.ZipFile(os.path.join(workfolder, file_name), 'r') for file in file_zip.namelist(): file_zip.extract(file, workfolder) file_zip.close()
file_list = os.listdir(workfolder) for file_name in file_list: if os.path.splitext(file_name)[1] == '.txt': pair = os.path.splitext(file_name)[0] data_path = os.path.join(workfolder, file_name) DB_path = 'sqlite:///' + os.path.join(DBfolder, pair) + '.sqlite3' PairToDB(pair, data_path, DB_path) print('Done!')
|