In [1]:
%load_ext watermark
%watermark -v -p numpy,sklearn,scipy,matplotlib,tensorflow
CPython 3.6.8
IPython 7.2.0

numpy 1.15.4
sklearn 0.20.2
scipy 1.1.0
matplotlib 3.0.2
tensorflow 1.13.1

12장 – 분산 텐서플로

이 노트북은 11장에 있는 모든 샘플 코드와 연습문제 해답을 가지고 있습니다.

설정

파이썬 2와 3을 모두 지원합니다. 공통 모듈을 임포트하고 맷플롯립 그림이 노트북 안에 포함되도록 설정하고 생성한 그림을 저장하기 위한 함수를 준비합니다:

In [2]:
# 파이썬 2와 파이썬 3 지원
from __future__ import division, print_function, unicode_literals

# 공통
import numpy as np
import os

# 일관된 출력을 위해 유사난수 초기화
def reset_graph(seed=42):
    tf.reset_default_graph()
    tf.set_random_seed(seed)
    np.random.seed(seed)

# 맷플롯립 설정
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams['axes.labelsize'] = 14
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12

# 그림을 저장할 폴더
PROJECT_ROOT_DIR = "."
CHAPTER_ID = "distributed"

def save_fig(fig_id, tight_layout=True):
    path = os.path.join(PROJECT_ROOT_DIR, "images", CHAPTER_ID, fig_id + ".png")
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format='png', dpi=300)

로컬 서버

In [3]:
import tensorflow as tf
In [4]:
c = tf.constant("Hello distributed TensorFlow!")
server = tf.train.Server.create_local_server()
In [5]:
with tf.Session(server.target) as sess:
    print(sess.run(c))
b'Hello distributed TensorFlow!'

클러스터

In [6]:
cluster_spec = tf.train.ClusterSpec({
    "ps": [
        "127.0.0.1:2221",  # /job:ps/task:0
        "127.0.0.1:2222",  # /job:ps/task:1
    ],
    "worker": [
        "127.0.0.1:2223",  # /job:worker/task:0
        "127.0.0.1:2224",  # /job:worker/task:1
        "127.0.0.1:2225",  # /job:worker/task:2
    ]})
In [7]:
task_ps0 = tf.train.Server(cluster_spec, job_name="ps", task_index=0)
task_ps1 = tf.train.Server(cluster_spec, job_name="ps", task_index=1)
task_worker0 = tf.train.Server(cluster_spec, job_name="worker", task_index=0)
task_worker1 = tf.train.Server(cluster_spec, job_name="worker", task_index=1)
task_worker2 = tf.train.Server(cluster_spec, job_name="worker", task_index=2)

여러 디바이스와 서버에 연산을 할당하기

In [8]:
reset_graph()

with tf.device("/job:ps"):
    a = tf.Variable(1.0, name="a")

with tf.device("/job:worker"):
    b = a + 2

with tf.device("/job:worker/task:1"):
    c = a + b
WARNING:tensorflow:From /home/haesun/anaconda3/envs/handson-ml/lib/python3.6/site-packages/tensorflow/python/framework/op_def_library.py:263: colocate_with (from tensorflow.python.framework.ops) is deprecated and will be removed in a future version.
Instructions for updating:
Colocations handled automatically by placer.
In [9]:
with tf.Session("grpc://127.0.0.1:2221") as sess:
    sess.run(a.initializer)
    print(c.eval())
4.0
In [10]:
reset_graph()

with tf.device(tf.train.replica_device_setter(
        ps_tasks=2,
        ps_device="/job:ps",
        worker_device="/job:worker")):
    v1 = tf.Variable(1.0, name="v1")  # /job:ps/task:0 (defaults to /cpu:0) 에 할당
    v2 = tf.Variable(2.0, name="v2")  # /job:ps/task:1 (defaults to /cpu:0) 에 할당
    v3 = tf.Variable(3.0, name="v3")  # /job:ps/task:0 (defaults to /cpu:0) 에 할당
    s = v1 + v2            # /job:worker (defaults to task:0/cpu:0) 에 할당
    with tf.device("/task:1"):
        p1 = 2 * s         # /job:worker/task:1 (defaults to /cpu:0) 에 할당
        with tf.device("/cpu:0"):
            p2 = 3 * s     # /job:worker/task:1/cpu:0 에 할당

config = tf.ConfigProto()
config.log_device_placement = True

with tf.Session("grpc://127.0.0.1:2221", config=config) as sess:
    v1.initializer.run()

리더 (Reader) - 예전 방법

In [11]:
reset_graph()

default1 = tf.constant([5.])
default2 = tf.constant([6])
default3 = tf.constant([7])
dec = tf.decode_csv(tf.constant("1.,,44"),
                    record_defaults=[default1, default2, default3])
with tf.Session() as sess:
    print(sess.run(dec))
[1.0, 6, 44]
In [12]:
reset_graph()

test_csv = open("my_test.csv", "w")
test_csv.write("x1, x2 , target\n")
test_csv.write("1.,, 0\n")
test_csv.write("4., 5. , 1\n")
test_csv.write("7., 8. , 0\n")
test_csv.close()

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    try:
        while True:
            sess.run(enqueue_instance)
    except tf.errors.OutOfRangeError as ex:
        print("더 이상 읽을 파일이 없습니다")
    sess.run(close_instance_queue)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("더 이상 훈련 샘플이 없습니다")
WARNING:tensorflow:From <ipython-input-12-3ced782caad1>:15: TextLineReader.__init__ (from tensorflow.python.ops.io_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Queue-based input pipelines have been replaced by `tf.data`. Use `tf.data.TextLineDataset`.
더 이상 읽을 파일이 없습니다
[array([[ 4.,  5.],
       [ 1., -1.]], dtype=float32), array([1, 0], dtype=int32)]
[array([[7., 8.]], dtype=float32), array([0], dtype=int32)]
더 이상 훈련 샘플이 없습니다
In [13]:
#coord = tf.train.Coordinator()
#threads = tf.train.start_queue_runners(coord=coord)
#filename_queue = tf.train.string_input_producer(["test.csv"])
#coord.request_stop()
#coord.join(threads)

QueueRunner와 Coordinator

In [14]:
reset_graph()

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

reader = tf.TextLineReader(skip_header_lines=1)
key, value = reader.read(filename_queue)

x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
features = tf.stack([x1, x2])

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")
enqueue_instance = instance_queue.enqueue([features, target])
close_instance_queue = instance_queue.close()

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

n_threads = 5
queue_runner = tf.train.QueueRunner(instance_queue, [enqueue_instance] * n_threads)
coord = tf.train.Coordinator()

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("더 이상 훈련 샘플이 없습니다")
WARNING:tensorflow:From <ipython-input-14-d19da45da613>:24: QueueRunner.__init__ (from tensorflow.python.training.queue_runner_impl) is deprecated and will be removed in a future version.
Instructions for updating:
To construct input pipelines, use the `tf.data` module.
[array([[ 4.,  5.],
       [ 1., -1.]], dtype=float32), array([1, 0], dtype=int32)]
[array([[7., 8.]], dtype=float32), array([0], dtype=int32)]
더 이상 훈련 샘플이 없습니다
In [15]:
reset_graph()

def read_and_push_instance(filename_queue, instance_queue):
    reader = tf.TextLineReader(skip_header_lines=1)
    key, value = reader.read(filename_queue)
    x1, x2, target = tf.decode_csv(value, record_defaults=[[-1.], [-1.], [-1]])
    features = tf.stack([x1, x2])
    enqueue_instance = instance_queue.enqueue([features, target])
    return enqueue_instance

filename_queue = tf.FIFOQueue(capacity=10, dtypes=[tf.string], shapes=[()])
filename = tf.placeholder(tf.string)
enqueue_filename = filename_queue.enqueue([filename])
close_filename_queue = filename_queue.close()

instance_queue = tf.RandomShuffleQueue(
    capacity=10, min_after_dequeue=2,
    dtypes=[tf.float32, tf.int32], shapes=[[2],[]],
    name="instance_q", shared_name="shared_instance_q")

minibatch_instances, minibatch_targets = instance_queue.dequeue_up_to(2)

read_and_enqueue_ops = [read_and_push_instance(filename_queue, instance_queue) for i in range(5)]
queue_runner = tf.train.QueueRunner(instance_queue, read_and_enqueue_ops)

with tf.Session() as sess:
    sess.run(enqueue_filename, feed_dict={filename: "my_test.csv"})
    sess.run(close_filename_queue)
    coord = tf.train.Coordinator()
    enqueue_threads = queue_runner.create_threads(sess, coord=coord, start=True)
    try:
        while True:
            print(sess.run([minibatch_instances, minibatch_targets]))
    except tf.errors.OutOfRangeError as ex:
        print("더 이상 훈련 샘플이 없습니다")
[array([[ 4.,  5.],
       [ 1., -1.]], dtype=float32), array([1, 0], dtype=int32)]
[array([[7., 8.]], dtype=float32), array([0], dtype=int32)]
더 이상 훈련 샘플이 없습니다

타임아웃 지정하기

In [16]:
reset_graph()

q = tf.FIFOQueue(capacity=10, dtypes=[tf.float32], shapes=[()])
v = tf.placeholder(tf.float32)
enqueue = q.enqueue([v])
dequeue = q.dequeue()
output = dequeue + 1

config = tf.ConfigProto()
config.operation_timeout_in_ms = 1000

with tf.Session(config=config) as sess:
    sess.run(enqueue, feed_dict={v: 1.0})
    sess.run(enqueue, feed_dict={v: 2.0})
    sess.run(enqueue, feed_dict={v: 3.0})
    print(sess.run(output))
    print(sess.run(output, feed_dict={dequeue: 5}))
    print(sess.run(output))
    print(sess.run(output))
    try:
        print(sess.run(output))
    except tf.errors.DeadlineExceededError as ex:
        print("dequeue 타임 아웃")
2.0
6.0
3.0
4.0
dequeue 타임 아웃

Data API

텐서플로 1.4에서 소개된 Data API를 사용하면 손쉽게 데이터를 효율적으로 읽을 수 있습니다.

In [17]:
tf.reset_default_graph()

0에서 9까지 정수를 세 번 반복한 간단한 데이터셋을 일곱 개씩 배치로 만들어 시작해 보죠:

In [18]:
dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))
dataset = dataset.repeat(3).batch(7)

첫 번째 줄은 0에서 9까지 정수를 담은 데이터셋을 만듭니다. 두 번째 줄은 이 데이터셋의 원소를 세 번 반복하고 일곱 개씩 담은 새로운 데이터셋을 만듭니다. 위에서 볼 수 있듯이 원본 데이터셋에서 여러 변환 메서드를 연결하여 호출하여 적용했습니다.

그다음, 데이터셋을 한 번 순회하는 원-샷-이터레이터(one-shot-iterator)를 만들고, 다음 원소를 지칭하는 텐서를 얻기 위해 get_next() 메서드를 호출합니다.

In [19]:
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()

next_element를 반복적으로 평가해서 데이터셋을 순회해 보죠. 원소가 별로 없기 때문에 OutOfRangeError가 발생합니다:

In [20]:
with tf.Session() as sess:
    try:
        while True:
            print(next_element.eval())
    except tf.errors.OutOfRangeError:
        print("완료")
[0 1 2 3 4 5 6]
[7 8 9 0 1 2 3]
[4 5 6 7 8 9 0]
[1 2 3 4 5 6 7]
[8 9]
완료

좋네요! 잘 작동합니다.

늘 그렇듯이 텐서는 그래프를 실행(sess.run())할 때마다 한 번만 평가된다는 것을 기억하세요. next_element에 의존하는 텐서를 여러개 평가하더라도 한 번만 평가됩니다. 또한 next_element를 동시에 두 번 실행해도 마찬가지입니다:

In [21]:
with tf.Session() as sess:
    try:
        while True:
            print(sess.run([next_element, next_element]))
    except tf.errors.OutOfRangeError:
        print("완료")
[array([0, 1, 2, 3, 4, 5, 6]), array([0, 1, 2, 3, 4, 5, 6])]
[array([7, 8, 9, 0, 1, 2, 3]), array([7, 8, 9, 0, 1, 2, 3])]
[array([4, 5, 6, 7, 8, 9, 0]), array([4, 5, 6, 7, 8, 9, 0])]
[array([1, 2, 3, 4, 5, 6, 7]), array([1, 2, 3, 4, 5, 6, 7])]
[array([8, 9]), array([8, 9])]
완료

interleave() 메서드는 강력하지만 처음에는 이해하기 좀 어렵습니다. 예제를 통해 이해하는 것이 가장 좋습니다:

In [22]:
tf.reset_default_graph()
In [23]:
dataset = tf.data.Dataset.from_tensor_slices(np.arange(10))
dataset = dataset.repeat(3).batch(7)
dataset = dataset.interleave(
    lambda v: tf.data.Dataset.from_tensor_slices(v),
    cycle_length=3,
    block_length=2)
iterator = dataset.make_one_shot_iterator()
next_element = iterator.get_next()
In [24]:
with tf.Session() as sess:
    try:
        while True:
            print(next_element.eval(), end=",")
    except tf.errors.OutOfRangeError:
        print("완료")
0,1,7,8,4,5,2,3,9,0,6,7,4,5,1,2,8,9,6,3,0,1,2,8,9,3,4,5,6,7,완료

cycle_length=3이므로 새로운 데이터셋은 이전 데이터셋에서 세 개의 원소를 추출합니다. 즉 [0,1,2,3,4,5,6], [7,8,9,0,1,2,3], [4,5,6,7,8,9,0] 입니다. 그다음 원소마다 하나의 데이터셋을 만들기 위해 람다(lambda) 함수를 호출합니다. Dataset.from_tensor_slices()를 사용했기 때문에 각 데이터셋은 차례대로 원소를 반환합니다. 다음 이 세 개의 데이터셋에서 각각 두 개의 아이템(block_length=2이므로)을 추출합니다. 세 개의 데이터셋의 아이템이 모두 소진될 때까지 반복됩니다. 즉 0,1 (첫 번째에서), 7,8 (두 번째에서), 4,5 (세 번째에서), 2,3 (첫 번째에서), 9,0 (두 번째에서) 등과 같은 식으로 8,9 (세 번째에서), 6 (첫 번째에서), 3 (두 번째에서), 0 (세 번째에서)까지 진행됩니다. 그다음에 원본 데이터셋에서 다음 번 세 개의 원소를 추출하려고 합니다. 하지만 두 개만 남아 있습니다. [1,2,3,4,5,6,7][8,9] 입니다. 다시 이 원소로부터 데이터셋을 만들고 이 데이텃세의 아이템이 모두 소진될 때까지 두 개의 아이템을 추출합니다. 1,2 (첫 번째에서), 8,9 (두 번째에서), 3,4 (첫 번째에서), 5,6 (첫 번째에서), 7 (첫 번째에서)가 됩니다. 배열의 길이가 다르기 때문에 마지막에는 교대로 배치되지 않았습니다.

리더 (Reader) - 새로운 방법

from_tensor_slices()from_tensor()를 기반으로 한 원본 데이터셋을 사용하는 대신 리더 데이터셋을 사용할 수 있습니다. 복잡한 일들을 대부분 대신 처리해 줍니다(예를 들면, 스레드):

In [25]:
tf.reset_default_graph()
In [26]:
filenames = ["my_test.csv"]
In [27]:
dataset = tf.data.TextLineDataset(filenames)

각 줄을 어떻게 디코드해야 하는지는 알려 주어야 합니다:

In [28]:
def decode_csv_line(line):
    x1, x2, y = tf.decode_csv(
        line, record_defaults=[[-1.], [-1.], [-1.]])
    X = tf.stack([x1, x2])
    return X, y

그다음, 이 디코딩 함수를 map()을 사용하여 데이터셋에 있는 각 원소에 적용할 수 있습니다:

In [29]:
dataset = dataset.skip(1).map(decode_csv_line)

마지막으로 원-샷-이터레이터를 만들어 보죠:

In [30]:
it = dataset.make_one_shot_iterator()
X, y = it.get_next()
In [31]:
with tf.Session() as sess:
    try:
        while True:
            X_val, y_val = sess.run([X, y])
            print(X_val, y_val)
    except tf.errors.OutOfRangeError as ex:
        print("완료")
[ 1. -1.] 0.0
[4. 5.] 1.0
[7. 8.] 0.0
완료

연습문제 해답

Coming soon