%load_ext watermark
%watermark -v -p numpy,sklearn,scipy,matplotlib,tensorflow
CPython 3.6.8 IPython 7.2.0 numpy 1.15.4 sklearn 0.20.2 scipy 1.1.0 matplotlib 3.0.2 tensorflow 1.13.1
12장 – 분산 텐서플로
이 노트북은 11장에 있는 모든 샘플 코드와 연습문제 해답을 가지고 있습니다.
파이썬 2와 3을 모두 지원합니다. 공통 모듈을 임포트하고 맷플롯립 그림이 노트북 안에 포함되도록 설정하고 생성한 그림을 저장하기 위한 함수를 준비합니다:
# 파이썬 2와 파이썬 3 지원
from __future__ import division, print_function, unicode_literals
# 공통
import numpy as np
import os
# 일관된 출력을 위해 유사난수 초기화
def reset_graph(seed=42):
tf.reset_default_graph()
tf.set_random_seed(seed)
np.random.seed(seed)
# 맷플롯립 설정
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12
# 그림을 저장할 폴더
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "distributed"
def save_fig(fig_id, tight_layout=True):
path = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID, fig_id + ".png")
if tight_layout:
plt.tight_layout()
plt.savefig(path, format='png', dpi=300)
import tensorflow as tf
c = tf.constant("Hello distributed TensorFlow!")
server = tf.train.Server.create_local_server()
with tf.Session(server.target) as sess:
print(sess.run(c))
b'Hello distributed TensorFlow!'
cluster_spec = tf.train.ClusterSpec({
"ps": [
"127.0.0.1:2221", # /job:ps/task:0
"127.0.0.1:2222", # /job:ps/task:1
],
"worker": [
"127.0.0.1:2223", # /job:worker/task:0
"127.0.0.1:2224", # /job:worker/task:1
"127.0.0.1:2225", # /job:worker/task:2
]})
task_ps0 = tf.train.Server(cluster_spec, job_name="ps", task_index=0)
task_ps1 = tf.train.Server(cluster_spec, job_name="ps", task_index=1)
task_worker0 = tf.train.Server(cluster_spec, job_name="worker", task_index=0)
task_worker1 = tf.train.Server(cluster_spec, job_name="worker", task_index=1)
task_worker2 = tf.train.Server(cluster_spec, job_name="worker", task_index=2)
reset_graph()
with tf.device("/job:ps"):
a = tf.Variable(1.0, name="a")
with tf.device("/job:worker"):
b = a + 2
with tf.device("/job:worker/task:1"):
c = a + b
WARNING:tensorflow:From /home/haesun/anaconda3/envs/handson-ml/lib/python3.6/site-packages/tensorflow/python/framework/op_def_library.py:263: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version. Instructions for updating: Colocations handled automatically by placer.
with tf.Session("grpc://127.0.0.1:2221") as sess:
sess.run(a.initializer)
print(c.eval())
4.0
reset_graph()
with tf.device(tf.train.replica_device_setter(
ps_tasks=2,
ps_device="/job:ps",
worker_device="/job:worker")):
v1 = tf.Variable(1.0, name="v1") # /job:ps/task:0 (defaults to /cpu:0) 에 할당
v2 = tf.Variable(2.0, name="v2") # /job:ps/task:1 (defaults to /cpu:0) 에 할당
v3 = tf.Variable(3.0, name="v3") # /job:ps/task:0 (defaults to /cpu:0) 에 할당
s = v1 + v2 # /job:worker (defaults to task:0/cpu:0) 에 할당
with tf.device("/task:1"):
p1 = 2 * s # /job:worker/task:1 (defaults to /cpu:0) 에 할당
with tf.device("/cpu:0"):
p2 = 3 * s # /job:worker/task:1/cpu:0 에 할당
config = tf.ConfigProto()
config.log_device_placement = True
with tf.Session("grpc://127.0.0.1:2221", config=config) as sess:
v1.initializer.run()
reset_graph()
default1 = tf.constant([5.])
default2 = tf.constant([6])
default3 = tf.constant([7])
dec = tf.decode_csv(tf.constant("1.,,44"),
record_defaults=[default1, default2, default3])
with tf.Session() as sess:
print(sess.run(dec))
[1.0, 6, 44]
reset_graph()
test_csv = open("my_test.csv", "w")
test_csv.write("x1, x2 , target\n")
test_csv.write("1.,, 0\n")
test_csv.write("4., 5. , 1\n")
test_csv.write("7., 8. , 0\n")
test_csv.close()
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()
reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)
x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])
instance_queue = tf.RandomShuffleQueue(
capacity=10, min_after_dequeue=2,
dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()
minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)
with tf.Session() as sess:
sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
sess.run(close_filename_queue)
try:
while True:
sess.run(enqueue_instance)
except tf.errors.OutOfRangeError as ex:
print("더 이상 읽을 파일이 없습니다")
sess.run(close_instance_queue)
try:
while True:
print(sess.run([minibatch_instances, minibatch_targets]))
except tf.errors.OutOfRangeError as ex:
print("더 이상 훈련 샘플이 없습니다")
WARNING:tensorflow:From <ipython-input-12-3ced782caad1>:15: TextLineReader.__init__ (from tensorflow.python.ops.io_ops) is deprecated and will be removed in a future version. Instructions for updating: Queue-based input pipelines have been replaced by `tf.data`. Use `tf.data.TextLineDataset`. 더 이상 읽을 파일이 없습니다 [array([[ 4., 5.], [ 1., -1.]], dtype=float32), array([1, 0], dtype=int32)] [array([[7., 8.]], dtype=float32), array([0], dtype=int32)] 더 이상 훈련 샘플이 없습니다
#coord = tf.train.Coordinator()
#threads = tf.train.start_queue_runners(coord=coord)
#filename_queue = tf.train.string_input_producer(["test.csv"])
#coord.request_stop()
#coord.join(threads)
reset_graph()
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()
reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)
x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])
instance_queue = tf.RandomShuffleQueue(
capacity=10, min_after_dequeue=2,
dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()
minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)
n_threads = 5
queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)
coord = tf.train.Coordinator()
with tf.Session() as sess:
sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
sess.run(close_filename_queue)
enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
try:
while True:
print(sess.run([minibatch_instances, minibatch_targets]))
except tf.errors.OutOfRangeError as ex:
print("더 이상 훈련 샘플이 없습니다")
WARNING:tensorflow:From <ipython-input-14-d19da45da613>:24: QueueRunner.__init__ (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version. Instructions for updating: To construct input pipelines, use the `tf.data` module. [array([[ 4., 5.], [ 1., -1.]], dtype=float32), array([1, 0], dtype=int32)] [array([[7., 8.]], dtype=float32), array([0], dtype=int32)] 더 이상 훈련 샘플이 없습니다
reset_graph()
def read_and_push_instance(filename_queue, instance_queue):
reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)
x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])
enqueue_instance = instance_queue.enqueue([features, target])
return enqueue_instance
filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()
instance_queue = tf.RandomShuffleQueue(
capacity=10, min_after_dequeue=2,
dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
name="instance_q", shared_name="shared_instance_q")
minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)
read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]
queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)
with tf.Session() as sess:
sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
sess.run(close_filename_queue)
coord = tf.train.Coordinator()
enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
try:
while True:
print(sess.run([minibatch_instances, minibatch_targets]))
except tf.errors.OutOfRangeError as ex:
print("더 이상 훈련 샘플이 없습니다")
[array([[ 4., 5.], [ 1., -1.]], dtype=float32), array([1, 0], dtype=int32)] [array([[7., 8.]], dtype=float32), array([0], dtype=int32)] 더 이상 훈련 샘플이 없습니다
reset_graph()
q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])
v = tf.placeholder(tf.float32)
enqueue = q.enqueue([v])
dequeue = q.dequeue()
output = dequeue + 1
config = tf.ConfigProto()
config.operation_timeout_in_ms = 1000
with tf.Session(config=config) as sess:
sess.run(enqueue, feed_dict={v: 1.0})
sess.run(enqueue, feed_dict={v: 2.0})
sess.run(enqueue, feed_dict={v: 3.0})
print(sess.run(output))
print(sess.run(output, feed_dict={dequeue: 5}))
print(sess.run(output))
print(sess.run(output))
try:
print(sess.run(output))
except tf.errors.DeadlineExceededError as ex:
print("dequeue 타임 아웃")
2.0 6.0 3.0 4.0 dequeue 타임 아웃
텐서플로 1.4에서 소개된 Data API를 사용하면 손쉽게 데이터를 효율적으로 읽을 수 있습니다.
tf.reset_default_graph()
0에서 9까지 정수를 세 번 반복한 간단한 데이터셋을 일곱 개씩 배치로 만들어 시작해 보죠:
dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))
dataset = dataset.repeat(3).batch(7)
첫 번째 줄은 0에서 9까지 정수를 담은 데이터셋을 만듭니다. 두 번째 줄은 이 데이터셋의 원소를 세 번 반복하고 일곱 개씩 담은 새로운 데이터셋을 만듭니다. 위에서 볼 수 있듯이 원본 데이터셋에서 여러 변환 메서드를 연결하여 호출하여 적용했습니다.
그다음, 데이터셋을 한 번 순회하는 원-샷-이터레이터(one-shot-iterator)를 만들고, 다음 원소를 지칭하는 텐서를 얻기 위해 get_next()
메서드를 호출합니다.
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
next_element
를 반복적으로 평가해서 데이터셋을 순회해 보죠. 원소가 별로 없기 때문에 OutOfRangeError
가 발생합니다:
with tf.Session() as sess:
try:
while True:
print(next_element.eval())
except tf.errors.OutOfRangeError:
print("완료")
[0 1 2 3 4 5 6] [7 8 9 0 1 2 3] [4 5 6 7 8 9 0] [1 2 3 4 5 6 7] [8 9] 완료
좋네요! 잘 작동합니다.
늘 그렇듯이 텐서는 그래프를 실행(sess.run()
)할 때마다 한 번만 평가된다는 것을 기억하세요. next_element
에 의존하는 텐서를 여러개 평가하더라도 한 번만 평가됩니다. 또한 next_element
를 동시에 두 번 실행해도 마찬가지입니다:
with tf.Session() as sess:
try:
while True:
print(sess.run([next_element, next_element]))
except tf.errors.OutOfRangeError:
print("완료")
[array([0, 1, 2, 3, 4, 5, 6]), array([0, 1, 2, 3, 4, 5, 6])] [array([7, 8, 9, 0, 1, 2, 3]), array([7, 8, 9, 0, 1, 2, 3])] [array([4, 5, 6, 7, 8, 9, 0]), array([4, 5, 6, 7, 8, 9, 0])] [array([1, 2, 3, 4, 5, 6, 7]), array([1, 2, 3, 4, 5, 6, 7])] [array([8, 9]), array([8, 9])] 완료
interleave()
메서드는 강력하지만 처음에는 이해하기 좀 어렵습니다. 예제를 통해 이해하는 것이 가장 좋습니다:
tf.reset_default_graph()
dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))
dataset = dataset.repeat(3).batch(7)
dataset = dataset.interleave(
lambda v: tf.data.Dataset.from_tensor_slices(v),
cycle_length=3,
block_length=2)
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
with tf.Session() as sess:
try:
while True:
print(next_element.eval(), end=",")
except tf.errors.OutOfRangeError:
print("완료")
0,1,7,8,4,5,2,3,9,0,6,7,4,5,1,2,8,9,6,3,0,1,2,8,9,3,4,5,6,7,완료
cycle_length=3
이므로 새로운 데이터셋은 이전 데이터셋에서 세 개의 원소를 추출합니다. 즉 [0,1,2,3,4,5,6]
, [7,8,9,0,1,2,3]
, [4,5,6,7,8,9,0]
입니다. 그다음 원소마다 하나의 데이터셋을 만들기 위해 람다(lambda) 함수를 호출합니다. Dataset.from_tensor_slices()
를 사용했기 때문에 각 데이터셋은 차례대로 원소를 반환합니다. 다음 이 세 개의 데이터셋에서 각각 두 개의 아이템(block_length=2
이므로)을 추출합니다. 세 개의 데이터셋의 아이템이 모두 소진될 때까지 반복됩니다. 즉 0,1 (첫 번째에서), 7,8 (두 번째에서), 4,5 (세 번째에서), 2,3 (첫 번째에서), 9,0 (두 번째에서) 등과 같은 식으로 8,9 (세 번째에서), 6 (첫 번째에서), 3 (두 번째에서), 0 (세 번째에서)까지 진행됩니다. 그다음에 원본 데이터셋에서 다음 번 세 개의 원소를 추출하려고 합니다. 하지만 두 개만 남아 있습니다. [1,2,3,4,5,6,7]
와 [8,9]
입니다. 다시 이 원소로부터 데이터셋을 만들고 이 데이텃세의 아이템이 모두 소진될 때까지 두 개의 아이템을 추출합니다. 1,2 (첫 번째에서), 8,9 (두 번째에서), 3,4 (첫 번째에서), 5,6 (첫 번째에서), 7 (첫 번째에서)가 됩니다. 배열의 길이가 다르기 때문에 마지막에는 교대로 배치되지 않았습니다.
from_tensor_slices()
나 from_tensor()
를 기반으로 한 원본 데이터셋을 사용하는 대신 리더 데이터셋을 사용할 수 있습니다. 복잡한 일들을 대부분 대신 처리해 줍니다(예를 들면, 스레드):
tf.reset_default_graph()
filenames = ["my_test.csv"]
dataset = tf.data.TextLineDataset(filenames)
각 줄을 어떻게 디코드해야 하는지는 알려 주어야 합니다:
def decode_csv_line(line):
x1, x2, y = tf.decode_csv(
line, record_defaults=[[-1.], [-1.], [-1.]])
X = tf.stack([x1, x2])
return X, y
그다음, 이 디코딩 함수를 map()
을 사용하여 데이터셋에 있는 각 원소에 적용할 수 있습니다:
dataset = dataset.skip(1).map(decode_csv_line)
마지막으로 원-샷-이터레이터를 만들어 보죠:
it = dataset.make_one_shot_iterator()
X, y = it.get_next()
with tf.Session() as sess:
try:
while True:
X_val, y_val = sess.run([X, y])
print(X_val, y_val)
except tf.errors.OutOfRangeError as ex:
print("완료")
[ 1. -1.] 0.0 [4. 5.] 1.0 [7. 8.] 0.0 완료
Coming soon