import numpy as np
import pandas as pd
import indicators as ind #indicators.pyのインポート
from backtest import Backtest,BacktestReport
dataM1 = pd.read_csv('DAT_ASCII_GBPUSD_M1_2016.csv', sep=';',
names=('Time','Open','High','Low','Close', ''),
index_col='Time', parse_dates=True)
dataM1.index += pd.offsets.Hour(7) #7時間のオフセット
ohlc = ind.TF_ohlc(dataM1, 'H') #1時間足データの作成
今回は最適化するパラメータの値の組み合わせを増やすために、2本の移動平均の交差システムに決済用のシグナルを追加。決済用のシグナルは、以下のように定義。
買いポジションの決済:終値が決済用移動平均を下抜けたとき
売りポジションの決済:終値が決済用移動平均を上抜けたとき
このシステムでは、パラメータは3個。今回はそれぞれのパラメータを以下のような範囲で探索することにします。
SlowMAperiod = np.arange(7, 151) #長期移動平均期間の範囲
FastMAperiod = np.arange(5, 131) #短期移動平均期間の範囲
ExitMAperiod = np.arange(3, 111) #決済用移動平均期間の範囲
遺伝的アルゴリズムのメインルーチンは、前回のランダムサーチとほぼ同じです。
パラメータをランダムに探すところを、後述の遺伝的処理に置き換えるだけです。
また売買シグナルは上記のルールのように決済用のシグナルを追加しています。
もう一つ、前回からの変更点は、SlowMAperiod, FastMAperiod, ExitMAperiodの三つのパラメータの範囲をPrangeというリストにまとめて各関数に渡すようにした点です。
こうすることで、パラメータの数が増えてもそのまま対応することができます。
def Optimize(ohlc, Prange):
def shift(x, n=1): return np.concatenate((np.zeros(n), x[:-n])) #シフト関数
SlowMA = np.empty([len(Prange[0]), len(ohlc)]) #長期移動平均
for i in range(len(Prange[0])):
SlowMA[i] = ind.iMA(ohlc, Prange[0][i])
FastMA = np.empty([len(Prange[1]), len(ohlc)]) #短期移動平均
for i in range(len(Prange[1])):
FastMA[i] = ind.iMA(ohlc, Prange[1][i])
ExitMA = np.empty([len(Prange[2]), len(ohlc)]) #決済用移動平均
for i in range(len(Prange[2])):
ExitMA[i] = ind.iMA(ohlc, Prange[2][i])
Close = ohlc['Close'].values #終値
M = 20 #個体数
Eval = np.zeros([M, 6]) #評価項目
Param = InitParam(Prange, M) #パラメータ初期化
gens = 0 #世代数
while gens < 100:
for k in range(M):
i0 = Param[k,0]
i1 = Param[k,1]
i2 = Param[k,2]
#買いエントリーシグナル
BuyEntry = (FastMA[i1] > SlowMA[i0]) & (shift(FastMA[i1]) <= shift(SlowMA[i0]))
#売りエントリーシグナル
SellEntry = (FastMA[i1] < SlowMA[i0]) & (shift(FastMA[i1]) >= shift(SlowMA[i0]))
#買いエグジットシグナル
BuyExit = (Close < ExitMA[i2]) & (shift(Close) >= shift(ExitMA[i2]))
#売りエグジットシグナル
SellExit = (Close > ExitMA[i2]) & (shift(Close) <= shift(ExitMA[i2]))
#バックテスト
Trade, PL = Backtest(ohlc, BuyEntry, SellEntry, BuyExit, SellExit)
Eval[k] = BacktestReport(Trade, PL)
# 世代の交代
Param = Evolution(Param, Eval[:,0], Prange)
gens += 1
print(gens, Eval[0,0])
Slow = Prange[0][Param[:,0]]
Fast = Prange[1][Param[:,1]]
Exit = Prange[2][Param[:,2]]
return pd.DataFrame({'Slow':Slow, 'Fast':Fast, 'Exit':Exit, 'Profit': Eval[:,0], 'Trades':Eval[:,1],
'Average':Eval[:,2],'PF':Eval[:,3], 'MDD':Eval[:,4], 'RF':Eval[:,5]},
columns=['Slow','Fast','Exit','Profit','Trades','Average','PF','MDD','RF'])
上の関数で、GA用に追加した関数は、InitParam()とEvolution()です。まず、InitParam()は、各個体のパラメータの初期化です。
from numpy.random import randint,choice
#パラメータ初期化
def InitParam(Prange, M):
Param = randint(len(Prange[0]), size=M)
for i in range(1,len(Prange)):
Param = np.vstack((Param, randint(len(Prange[i]), size=M)))
return Param.T
Evolution()は、次のようにいくつかの遺伝的処理を含みます。
#遺伝的処理
def Evolution(Param, Eval, Prange):
#エリート保存付きルーレット選択
Param = Param[np.argsort(Eval)[::-1]] #ソート
R = Eval-min(Eval)
R = R/sum(R)
idx = choice(len(Eval), size=len(Eval), replace=True, p=R)
idx[0] = 0 #エリート保存
Param = Param[idx]
#1点交叉
def Evolution(Param, Eval, Prange):
N = 10
idx = choice(np.arange(1,len(Param)), size=N, replace=False)
for i in range(0,N,2):
ix = idx[i:i+2]
p = randint(1,len(Prange))
Param[ix] = np.hstack((Param[ix][:,:p], Param[ix][:,p:][::-1]))
#近傍生成
def Evolution(Param, Eval, Prange):
N = 10
idx = choice(np.arange(1,len(Param)), size=N, replace=False)
diff = choice([-1,1], size=N).reshape(N,1)
for i in range(N):
p = randint(len(Prange))
Param[idx[i]][p:p+1] = (Param[idx[i]][p]+diff[i]+len(Prange[p]))%len(Prange[p])
#突然変異
def Evolution(Param, Eval, Prange):
N = 2
idx = choice(np.arange(1,len(Param)), size=N, replace=False)
for i in range(N):
p = randint(len(Prange))
Param[idx[i]][p:p+1] = randint(len(Prange[p]))
return Param
それぞれの処理について説明します。
現在の個体の中から次世代に残す個体を選択します。
選択の方法はいくつかあるのですが、Numpyの関数でルーレット選択に便利な関数があったので、それを使ってみます。
ルーレット選択はバックテストの評価値である適応度の大きさに応じて、確率的に次世代に残す個体を選択する方法です。
適応度が高いほど残りやすくなります。
今回使った関数は、numpy.random.choice()という関数で、リストから必要な個数分ランダムに選択するものですが、オプションの引数にpという選択確率のリストを付けると、その確率に合わせて選択してくれます。
これはルーレット選択そのものです。次のようなコードになります。
def Evolution(Param, Eval, Prange):
#エリート保存付きルーレット選択
Param = Param[np.argsort(Eval)[::-1]] #ソート
R = Eval-min(Eval)
R = R/sum(R)
idx = choice(len(Eval), size=len(Eval), replace=True, p=R)
idx[0] = 0 #エリート保存
Param = Param[idx]
ただ、確率がマイナスだと都合が悪いので、適応度の最小値が0になるように補正してあります。
またルーレット選択だけだと、適応度が高くても運悪く選ばれないこともあるので、適応度をソートして最も高い個体(エリート)は必ず次世代に残るようにしています。
次に遺伝子の交叉を行います。これは、二つの個体を選んで、遺伝子情報の一部を互いに交換することです。
交叉の方法もいくつかありますが、ここではパラメータの並びの1か所を選んで、その前後を交換する方法にしました。
def Evolution(Param, Eval, Prange):
N = 10
idx = choice(np.arange(1,len(Param)), size=N, replace=False)
for i in range(0,N,2):
ix = idx[i:i+2]
p = randint(1,len(Prange))
Param[ix] = np.hstack((Param[ix][:,:p], Param[ix][:,p:][::-1]))
ここでもchoice()を使って交叉する個体の個数分の乱数列を生成します。replace=Falseを付けることで、重複のない乱数列が得られます。
そして、2個ずつの個体をixとして選択して、交叉点pの後半のデータを入れ換えることで交叉を実現しています。
通常のGAでは、選択、交叉、突然変異で進化のシミュレーションを行うのですが、今回のように交叉点をパラメータの切れ目のところに限定していると、いつの間にか同じ個体だらけになってしまい、進化が止まってしまいます。
かといって、突然変異を多くすると、ランダムサーチに近くなってしまうので、あまり効率がよくありません。
そこで、今回は、パラメータの一部を+1、あるいは-1する変化を施します。
いわゆる近傍解というやつで、局所探索アルゴリズムでよく利用されるものです。
def Evolution(Param, Eval, Prange):
N = 10
idx = choice(np.arange(1,len(Param)), size=N, replace=False)
diff = choice([-1,1], size=N).reshape(N,1)
for i in range(N):
p = randint(len(Prange))
Param[idx[i]][p:p+1] = (Param[idx[i]][p]+diff[i]+len(Prange[p]))%len(Prange[p])
交叉と同じく近傍を生成する個体を選択します。
そして、どのパラメータを変化させるかをまた乱数で決めて、そのパラメータを1だけ変化させます。
最後に突然変異を実行します。
これもいくつかの方法がありますが、選択した個体のパラメータの一部を新しく乱数で書き換えます。
GAの場合、局所解から抜け出すには、突然変異が重要なのですが、これを多用するとランダム性が高くなってしまうので、ここでは、2個程度にしておきます。
def Evolution(Param, Eval, Prange):
N = 2
idx = choice(np.arange(1,len(Param)), size=N, replace=False)
for i in range(N):
p = randint(len(Prange))
Param[idx[i]][p:p+1] = randint(len(Prange[p]))
以上のように定義した関数を使って遺伝的アルゴリズムを実行してみます。
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
result.sort_values('Profit', ascending=False)
GAも乱数を使っているので、結果は毎回異なります。
以下は結果の一例で、世代毎に最も高い適応度を示したものです。
エリート保存しているので、高い適応度は順次更新されます。
この問題の最適解は調べていないのでわかりませんが、この結果は、まあまあ高い値だと思います。
そもそもGAの目的は、最適解を求めることではなく、総当たりするより短い時間で準最適解を求めることです。
実際、シストレのパラメータで最適解を求めたところで、そのシステムが別の期間でも同じ結果を出すわけではありませんね。
そういうことなので、200万通りの組み合わせの中から2000回の試行でまあまあの解が得られたのであれば、よしとするべきでしょうね。
遺伝的アルゴリズムは、たくさんの個体を評価してその適応度により選択、交叉などの遺伝的処理を行うのですが、それぞれの個体の評価は完全に独立しているので、並列処理に適しています。ですが今回は並列処理をする前の時間を測っときました。。。
Corei7の8スレッドだと11秒だそうです。(私は、Corei5なので下記の結果になりました。)
※並列処理については、時間があれば取り組みたいと思います。
import time
start = time.perf_counter()
result = Optimize(ohlc, [SlowMAperiod, FastMAperiod, ExitMAperiod])
print("elapsed_time = {0} sec".format(time.perf_counter()-start))