#!/usr/bin/env python # coding: utf-8 # In[1]: from pycuber_sc import Cube import numpy as np import tensorflow as tf from collections import deque import itertools import sys from collections import defaultdict import json from matplotlib import pyplot as plt plt.rcParams['figure.figsize'] = [30, 10] import pandas as pd # In[11]: # read this http://rubiks.wikia.com/wiki/Notation action_space = ['u', 'd', 'l', 'r', 'f', 'b', 'u\'', 'd\'', 'l\'', 'r\'', 'f\'', 'b\''] # In[9]: class Env: def __init__(self, tweaked_times_limit=100, tweaked_times=1): """ @param tweaked_times_limit 放弃旋转之前尝试的次数 @param tweaked_times 初始化一个魔方前旋转的次数 """ self._cube = Cube() # 随机初始化一个魔方 for _ in range(tweaked_times): self._cube(str(np.random.choice(action_space))) self.reset() self.nA = len(action_space) self.nS = len(self._get_state()) self._tweaked_times_limit = tweaked_times_limit def reset(self): """ 重置这个环境的时候将魔方恢复到初始状态 """ self.cube = self._cube.copy() self.tweaked_times = 0 return self._get_state() def step(self, action): self.cube(str(action_space[action])) self.tweaked_times += 1 done = self.cube.check() or self.tweaked_times > self._tweaked_times_limit #完成或者放弃 reward = -0.1 if not self.cube.check() else 1 # -0.1是生存的代价,负值保证了智能体不会在收集芝麻的道路上乐此不疲 return self._get_state(), reward, done, None def _get_state(self): """ 将状态归一化之后,重组为元组类型 """ return tuple(s / 5. for s in self.cube.get_state()) # In[ ]: Q = defaultdict(lambda :np.zeros(len(action_space))) # In[ ]: json.dump([{'key': key, 'value': list(value)} for key, value in Q.items()], open('./Q.json', 'w')) # In[ ]: Q_ = Q.copy() # In[ ]: Q = Q_.copy() # In[14]: lines = 0 with open('./Q.csv', 'r') as f: for line in f: lines += 1 print(lines) # In[7]: def _reload_Q(): Q = defaultdict(lambda :np.zeros(len(action_space))) Q_stage = json.load(open('./Q.json', 'r')) for pair in Q_stage: Q[tuple(pair['key'])] = np.array(pair['value']) return Q Q = _reload_Q() # In[15]: env = Env() nA = env.nA nS = env.nS with open('./Q.csv', 'w') as f: header = ','.join(['s{}'.format(k_) for k_ in range(nS)]) + ',' \ + ','.join(['v{}'.format(k_) for k_ in range(nA)]) + '\n' f.write(header) _Q = {} for times in range(6): for _ in range(5 * 6 ** times): env = Env(tweaked_times=times) state = env.reset() _Q[state] = Q[state] for key, value in _Q.items(): string = ','.join([str(k) for k in key]) + ',' + ','.join([str(v) for v in value]) + '\n' f.write(string) # In[ ]: env = Env() nA = env.nA nS = env.nS header = ','.join(['s{}'.format(k_) for k_ in range(nS)]) + ',' \ + ','.join(['v{}'.format(k_) for k_ in range(nA)]) + '\n' # In[ ]: header # In[ ]: env = Env() nA = env.nA nS = env.nS with open('./Q.csv', 'w') as f: header = ','.join(['s{}'.format(k_) for k_ in range(nS)]) + ',' \ + ','.join(['v{}'.format(k_) for k_ in range(nA)]) + '\n' f.write(header) for key, value in Q.items(): string = ','.join([str(k) for k in key]) + ',' + ','.join([str(v) for v in value]) + '\n' f.write(string) # In[16]: datsv = pd.read_csv('./Q.csv').astype(np.float32) # In[19]: class Actor: def __init__(self, env, scope='actor'): self._num_input = env.nS self._num_output = env.nA with tf.variable_scope(scope): self._x = tf.placeholder(dtype=tf.float32, shape=[None, self._num_input], name='x') self._y = tf.placeholder(dtype=tf.float32, shape=[None, self._num_output], name='y') self._training = tf.placeholder_with_default(True, shape=(), name='training') o0 = tf.layers.dense(self._x, 64, activation=tf.nn.relu, name='output-0') d0 = tf.layers.dropout(o0, rate=0.7, training=self._training, name='dropout-0') o1 = tf.layers.dense(d0, 64, activation=tf.nn.relu, name='output-1') d1 = tf.layers.dropout(o1, rate=0.7, training=self._training, name='dropout-1') o2 = tf.layers.dense(d1, 64, activation=tf.nn.relu, name='output-2') d2 = tf.layers.dropout(o2, rate=0.7, training=self._training, name='dropout-2') self._pred = tf.layers.dense(d2, self._num_output, activation=tf.nn.softmax, name='pred') self._loss = tf.nn.softmax_cross_entropy_with_logits(labels=self._y, logits=self._pred) self._op = tf.train.AdamOptimizer(learning_rate=.01).minimize(self._loss) def train(self, sess, x, y): assert isinstance(sess, tf.Session) loss, _ = sess.run([self._loss, self._op], feed_dict={self._x: x, self._y: y, self._training: True}) return loss def predict(self, sess, x): assert isinstance(sess, tf.Session) return sess.run(self._pred, feed_dict={self._x: x, self._training: False}) class Critic: def __init__(self, env, scope='critic'): self._num_input = env.nS self._num_output = env.nA with tf.variable_scope(scope): self._x = tf.placeholder(dtype=tf.float32, shape=[None, self._num_input], name='x') self._y = tf.placeholder(dtype=tf.float32, shape=[None, self._num_output], name='y') self._action = tf.placeholder(dtype=tf.int32, shape=[None, 1], name='action') self._training = tf.placeholder_with_default(True, shape=(), name='training') this_batch_size = tf.shape(self._x)[0] o0 = tf.layers.dense(self._x, 64, activation=tf.nn.relu, name='output-0') d0 = tf.layers.dropout(o0, rate=0.7, training=self._training, name='dropout-0') o1 = tf.layers.dense(d0, 64, activation=tf.nn.relu, name='output-1') d1 = tf.layers.dropout(o1, rate=0.7, training=self._training, name='dropout-1') o2 = tf.layers.dense(d1, 64, activation=tf.nn.relu, name='output-2') d2 = tf.layers.dropout(o2, rate=0.7, training=self._training, name='dropout-2') self._pred = tf.layers.dense(d2, self._num_output, activation=None, name='pred') indices = tf.range(this_batch_size, dtype=tf.int32) * self._num_output + tf.squeeze(self._action) preds = tf.reshape(tf.gather(tf.reshape(self._pred, shape=[-1]), indices), shape=[1, this_batch_size]) y = tf.reshape(tf.gather(tf.reshape(self._y, shape=[-1]), indices), shape=[1, this_batch_size]) #define pre-train methods self._pre_train_loss = tf.losses.mean_squared_error(self._y, self._pred) self._pre_train_op = tf.train.AdamOptimizer(0.1).minimize(self._pre_train_loss) self._loss = tf.losses.mean_squared_error(y, preds) self._op = tf.train.AdamOptimizer(0.1).minimize(self._loss) def train(self, sess, x, y, action): assert isinstance(sess, tf.Session) loss, _ = sess.run([self._loss, self._op], feed_dict={self._x: x, self._y: y, self._action: action, self._training: True}) return loss def predict(self, sess, x): assert isinstance(sess, tf.Session) return sess.run(self._pred, feed_dict={self._x: x, self._training:False}) def pre_train(self, sess, x, y): assert isinstance(sess, tf.Session) loss, _ = sess.run([self._pre_train_loss, self._pre_train_op], feed_dict={self._x: x, self._y: y}) return loss def pre_train_loss(self, sess, x, y): assert isinstance(sess, tf.Session) return sess.run(self._pre_train_loss, feed_dict={self._x: x, self._y: y}) # In[24]: tf.reset_default_graph() sess = tf.Session() env = Env() nS = env.nS nA = env.nA critic = Critic(env) num_row = len(datsv) batch_size = 200 batch_num = int((num_row / batch_size) + 1) sess.run(tf.global_variables_initializer()) # In[25]: def _train(d): dat = d.sample(frac=1.) for batch_idx in range(batch_num): batch = dat.iloc[batch_idx * batch_size: (batch_idx + 1) * batch_size, :] batch_x = batch.iloc[:, :nS] batch_y = batch.iloc[:, nS:] critic.pre_train(sess, batch_x, batch_y) return critic.pre_train_loss(sess, dat.iloc[:, :nS], dat.iloc[:, nS:]) # In[26]: losses = [] for epoc_idx in range(50): print('\repoc no. {:>10}'.format(epoc_idx), end='') losses.append(_train(datsv)) # In[27]: plt.plot(losses) plt.show() # In[ ]: # test the trained critic network def get_action(state): state = np.array(state).reshape([1, -1]) preds = critic.predict(sess=sess, x=state) return np.argmax(preds) env = Env(tweaked_times=3, tweaked_times_limit=6) steps = [] for _ in range(100): state = env.reset() for t in itertools.count(): action = get_action(state) state_prime, reward, done, _ = env.step(action) if done: steps.append(t) break else: state = state_prime # In[ ]: plt.plot(steps) plt.show() # In[ ]: #epsilon = .05 epsilon = 0 def get_action(state): """ 保证策略可以在贪婪性上做出变化 此处的贪婪系数为0,是因为在这种环境下,贪婪策略不会导致智能体永远选择 次好的 动作(随着魔方初始旋转次数的增加,可以考虑改变贪婪系数探索更优策略的可能) """ probs = np.ones(nA) * (epsilon / nA) probs[np.argmax(Q[state])] += (1. - epsilon) return np.random.choice(np.arange(nA), p=probs) for n in range(0, 6): # n代表了初始化的魔方的旋转次数(如果直接选择较大的旋转次数,会增加训练的智能体的迷茫) for i in range(5 * (6 ** n)): # 5 * 6 ** n 稍微加强了智能体遍历各种魔方状态的可能(但是泛化能力几乎为零,这种方法类似于MC方法的暴力美学) # implement Sarsa with one Q in the above cell env = Env(tweaked_times=n, tweaked_times_limit=n * 2) num_episode = 100 # 一种初始状态下的最多的尝试次数 discounter = 0.9 # temporal difference (TD) 中的折扣系数 lambda 参见 贝尔曼方程 bellman equation https://en.wikipedia.org/wiki/Bellman_equation sum_rewards = deque(maxlen=30) avg_sum_rewards = [] nA = env.nA for episode_idx in range(num_episode): state = env.reset() action = get_action(state) td_errors = 0 # Q-V 的误差累积值 rewards = 0 for t in itertools.count(): state_prime, reward, done, _ = env.step(action) rewards += reward action_prime = get_action(state_prime) td_error = reward + discounter * Q[state_prime][action_prime] - Q[state][action] # 参见 https://en.wikipedia.org/wiki/Temporal_difference_learning Q[state][action] += td_error * 0.3 # 0.3是学习率 eta td_errors += np.abs(td_error) if done: break else: state = state_prime action = action_prime sum_rewards.append(rewards) avg_sum_rewards.append(np.mean(sum_rewards)) print('\r {} {} {} {:>30}'.format(n, i, episode_idx, td_errors), end='') if td_errors < 10 ** -5: # 如果累积的误差已经很小的话,没有必要再持续尝试(state value近乎收敛) break # In[ ]: # implement Sarsa with one Q in the above cell env = Env(tweaked_times=5) num_episode = 100 #epsilon = .05 epsilon = 0 discounter = 0.9 sum_rewards = deque(maxlen=30) avg_sum_rewards = [] nA = env.nA def get_action(state): probs = np.ones(nA) * (epsilon / nA) probs[np.argmax(Q[state])] += (1. - epsilon) return np.random.choice(np.arange(nA), p=probs) for episode_idx in range(num_episode): state = env.reset() action = get_action(state) rewards = 0 for t in itertools.count(): state_prime, reward, done, _ = env.step(action) rewards += reward action_prime = get_action(state_prime) Q[state][action] += (reward + discounter * Q[state_prime][action_prime] - Q[state][action]) * 0.3 if done: break else: state = state_prime action = action_prime sum_rewards.append(rewards) avg_sum_rewards.append(np.mean(sum_rewards)) plt.plot(avg_sum_rewards) plt.show() # In[ ]: Q = defaultdict(lambda :np.zeros(len(action_space))) env = Env() num_episode = 150 #epsilon = .05 epsilon = 0 discounter = 0.9 sum_rewards = deque(maxlen=30) avg_sum_rewards = [] nA = env.nA def get_action(state): probs = np.ones(nA) * (epsilon / nA) probs[np.argmax(Q[state])] += (1. - epsilon) return np.random.choice(np.arange(nA), p=probs) for episode_idx in range(num_episode): state = env.reset() action = get_action(state) rewards = 0 for t in itertools.count(): state_prime, reward, done, _ = env.step(action) rewards += reward action_prime = get_action(state_prime) Q[state][action] += (reward + discounter * Q[state_prime][action_prime] - Q[state][action]) * 0.3 if done: break else: state = state_prime action = action_prime sum_rewards.append(rewards) avg_sum_rewards.append(np.mean(sum_rewards)) plt.plot(avg_sum_rewards) plt.show() # In[ ]: Q = defaultdict(lambda :np.zeros([len(action_space), 2])) env = Env() num_episode = 150 #epsilon = .05 epsilon = 0 discounter = 0.9 nA = env.nA sum_rewards = deque(maxlen=30) avg_sum_rewards1 = [] def get_action(state): probs = np.ones(nA) * (epsilon / nA) probs[np.argmax(Q[state][:, 0])] += (1. - epsilon) return np.random.choice(np.arange(nA), p=probs) for episode_idx in range(num_episode): state = env.reset() rewards = 0 for t in itertools.count(): action = get_action(state) state_prime, reward, done, _ = env.step(action) rewards += reward action_value = reward + discounter * np.max(np.squeeze(Q[state_prime][:, 0])) Q[state][action][0] += (action_value - Q[state][action][0]) / (Q[state][action][1] + 1) Q[state][action][1] += 1 if done: break else: state = state_prime sum_rewards.append(rewards) avg_sum_rewards1.append(np.mean(sum_rewards)) plt.plot(avg_sum_rewards1) plt.show() # In[ ]: Q = defaultdict(lambda :np.zeros(len(action_space))) env = Env() num_episode = 150 #epsilon = .05 epsilon = 0 discounter = 0.9 nA = env.nA sum_rewards = deque(maxlen=30) avg_sum_rewards2 = [] def get_action(state): probs = np.ones(nA) * (epsilon / nA) probs[np.argmax(Q[state])] += (1. - epsilon) return np.random.choice(np.arange(nA), p=probs) for episode_idx in range(num_episode): state = env.reset() rewards = 0 for t in itertools.count(): action = get_action(state) state_prime, reward, done, _ = env.step(action) rewards += reward action_value = reward + discounter * np.max(np.squeeze(Q[state_prime])) Q[state][action] += (action_value - Q[state][action]) * .3 if done: break else: state = state_prime sum_rewards.append(rewards) avg_sum_rewards2.append(np.mean(sum_rewards)) plt.plot(avg_sum_rewards2) plt.show() # In[ ]: plt.plot(avg_sum_rewards1, label='Q-learning_avg') plt.plot(avg_sum_rewards, label='Sarsa') plt.plot(avg_sum_rewards2, label='Q-learning_discount') plt.legend() plt.show() # In[ ]: Q[env.reset()] # In[ ]: action_space[np.argmax(Q[env.reset()][:, 0])] # In[ ]: env.cube(str(action_space[np.argmax(Q[env.reset()][:, 0])])).check() # In[ ]: env = Env() ret = [] for _ in range(1200): env.reset() env.cube(str(np.random.choice(action_space))) ret.append(env.cube.check()) # In[ ]: np.sum(ret) # In[ ]: num_episode = 3000 memory = deque(maxlen=100) tf.reset_default_graph() env = Env() actor, critic = Actor(env), Critic(env) session = tf.Session() session.run(tf.global_variables_initializer()) rewards = [] loss_critic = [] rewards_deque = deque(maxlen=80) epsilons = np.linspace(start=.5, stop=.01, num=num_episode + 1) # In[ ]: for episode_idx in range(num_episode): epsilon = epsilons[episode_idx] state = env.reset() for t in itertools.count(): probs = np.ones(env.nA) * (epsilon / env.nA) probs[np.argmax(np.squeeze(actor.predict(session, np.array(state).reshape(1, env.nS))))] += (1. - epsilon) action = np.random.choice(np.arange(env.nA), p=probs) state_prime, reward, done, _ = env.step(action) rewards.append(reward) rewards_deque.append(reward) if done: break else: memory.append((state, action, state_prime, reward)) if len(memory) >= memory.maxlen: print('\r in episode #{0}, step #{1}, avg score {2:>8}'.format(episode_idx, t, np.mean(rewards_deque)), end='') sys.stdout.flush() rp = np.array(memory)[np.random.randint(memory.maxlen, size=50), :] state_prime_batch = np.array([a for a in rp[:, 2]]) reward_batch = rp[:, 3].reshape(-1, 1) state_batch = np.array([a for a in rp[:, 0]]) action_batch = rp[:, 1].reshape(-1, 1) state_prime_action_values = critic.predict(session, state_prime_batch) td_target = reward_batch + .8 * state_prime_action_values td_error = td_target - critic.predict(session, state_batch) l_actor = actor.train(session, state_batch, td_error) l_critic = critic.train(session, state_batch, td_target, action_batch) loss_critic.append(l_critic) state = state_prime # In[ ]: plt.plot(rewards) plt.show() # In[ ]: plt.plot(loss_critic) plt.show()