外汇历史数据建库

  本文记录建立外汇历史数据库的过程,选取货币对EURUSD,数据源来自forextester,从2001开始。

  数据源选用forextester:http://www.forextester.com/data/datasources
,免费的数据源我看介绍是来自Forexite.Ltd。支持的pair有AUDJPY、AUDUSD、CHFJPY、EURCAD、EURCHF、EURGBP、EURJPY、EURUSD、GBPCHF、GBPJPY、GBPUSD、NZDJPY、NZDUSD、USDCAD、USDJPY、USDCHF、XAGUSD、XAUUSD,从2001年起,按月刷新。GMT/Bid。

  首先,一个时刻要注意的是时间的统一问题,已经校验过数据源的时间应该是GMT无疑,从数据质量上来看不适合去做10pip以下目标的研究,两个问题:1.M1以下的数据是只有OHLC;2.本身外汇无中心交易,源和手头交易平台以及第三方报价渠道校验均存在这细微的不稳定差异。

用resample的方法直接生成多TF:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 存入历史数据
import pandas as pd

df = pd.read_table(r'C:\Users\jiawei\DesktopEURUSD.txt')
df1 = df['<TICKER>,<DTYYYYMMDD>,<TIME>,<OPEN>,<HIGH>,<LOW>,<CLOSE>,<VOL>'].str.split(',', expand=True)
df1.columns = ['TICKER', 'YMD', 'HMS', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOL']
df1['DATETIME'] = pd.to_datetime(df1['YMD'] + df1['HMS'], format='%Y%m%d%H%M%S')
M1 = pd.DataFrame()
M1['OPEN'] = df1['OPEN'].astype(float)
M1['HIGH'] = df1['HIGH'].astype(float)
M1['LOW'] = df1['LOW'].astype(float)
M1['CLOSE'] = df1['CLOSE'].astype(float)
M1['VOL'] = df1['VOL'].astype(int)
M1 = M1.set_index(df1['DATETIME'])
# 整理得到M5,M15,M30,H1,H4,D1并存入sqlite
from sqlalchemy import create_engine

engine = create_engine(r'sqlite:///D:\Forex\EURUSD.sqlite3', echo=True)
M1.to_sql('M1', engine, if_exists='replace')
CHANGE_DICT = {'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last', 'VOL': 'sum'}
M1.resample('5T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M5', engine, if_exists='replace')
M1.resample('15T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M15', engine, if_exists='replace')
M1.resample('30T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M30', engine, if_exists='replace')
M1.resample('60T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('H1', engine, if_exists='replace')
M1.resample('240T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('H4', engine, if_exists='replace')
M1.resample('1440T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('D1', engine, if_exists='replace')

附一个取数的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 取数
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine(r'sqlite:///D:\Forex\EURUSD.sqlite3', echo=True)
sql = "select * from D1 where DATETIME>='2018-04' and DATETIME<'2018-05'"
df = pd.read_sql(sql, engine)
# 用pyecharts画下K线图展示一下轮廓
from pyecharts import Kline

v = df[['OPEN', 'HIGH', 'LOW', 'CLOSE']].values
kline = Kline("K 线图示例")
kline.add("四月日K", df['DATETIME'].values, v)
kline.render()

  18/11/15补充一个终极的解决方案,直接在forextester上下载压缩包就好了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from sqlalchemy import create_engine
import pandas as pd
import zipfile
import os


# import to DB
def PairToDB(pair, data_path, DB_path):
print('Start to read source data of', pair)
df = pd.read_table(data_path)
print('Done! Start to perpare M1.')
df = df['<TICKER>,<DTYYYYMMDD>,<TIME>,<OPEN>,<HIGH>,<LOW>,<CLOSE>,<VOL>'].str.split(',', expand=True)
df.columns = ['TICKER', 'YMD', 'HMS', 'OPEN', 'HIGH', 'LOW', 'CLOSE', 'VOL']
df['DATETIME'] = pd.to_datetime(df['YMD'] + df['HMS'], format='%Y%m%d%H%M%S')
M1 = pd.DataFrame()
M1['OPEN'] = df['OPEN'].astype(float)
M1['HIGH'] = df['HIGH'].astype(float)
M1['LOW'] = df['LOW'].astype(float)
M1['CLOSE'] = df['CLOSE'].astype(float)
M1['VOL'] = df['VOL'].astype(int)
M1 = M1.set_index(df['DATETIME'])

# importing
engine = create_engine(DB_path, echo=False)
print('Start to change TF and import', pair)
M1.to_sql('M1', engine, if_exists='replace')
print('M1 finish!')
CHANGE_DICT = {'OPEN': 'first', 'HIGH': 'max', 'LOW': 'min', 'CLOSE': 'last', 'VOL': 'sum'}
M1.resample('5T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M5', engine, if_exists='replace')
print('M5 finish!')
M1.resample('15T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M15', engine,
if_exists='replace')
print('M15 finish!')
M1.resample('30T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('M30', engine,
if_exists='replace')
print('M30 finish!')
M1.resample('60T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('H1', engine,
if_exists='replace')
print('H1 finish!')
M1.resample('240T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('H4', engine,
if_exists='replace')
print('H4 finish!')
M1.resample('1440T', closed='left', label='left').apply(CHANGE_DICT).dropna().to_sql('D1', engine,
if_exists='replace')
print('D1 finish!')
return True


if __name__ == '__main__':
workfolder = "C:\\Users\\jiawei\\Desktop"
DBfolder = "D:\\Forex"

# extract zip first
file_list = os.listdir(workfolder)
for file_name in file_list:
if os.path.splitext(file_name)[1] == '.zip':
file_zip = zipfile.ZipFile(os.path.join(workfolder, file_name), 'r')
for file in file_zip.namelist():
file_zip.extract(file, workfolder)
file_zip.close()

# dealing with every txt file
file_list = os.listdir(workfolder)
for file_name in file_list:
if os.path.splitext(file_name)[1] == '.txt':
pair = os.path.splitext(file_name)[0]
data_path = os.path.join(workfolder, file_name)
DB_path = 'sqlite:///' + os.path.join(DBfolder, pair) + '.sqlite3'
PairToDB(pair, data_path, DB_path)
print('Done!')