- 🤖 See full list of Machine Learning Experiments on GitHub
- ▶️ Interactive Demo: try this model and other machine learning experiments in action
In this experiment we will use character-based Recurrent Neural Network (RNN) to generate a Shakespeare's-like text based on the Shakespeare dataset from The Unreasonable Effectiveness of Recurrent Neural Networks blog post.
For this experiment we will use Tensorflow v2 with its Keras API.
_Inspired by Text generation with an RNN_
# Selecting Tensorflow version v2 (the command is relevant for Colab only).
# %tensorflow_version 2.x
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import platform
import time
import pathlib
import os
print('Python version:', platform.python_version())
print('Tensorflow version:', tf.__version__)
print('Keras version:', tf.keras.__version__)
cache_dir = './tmp'
dataset_file_name = 'shakespeare.txt'
dataset_file_origin = 'https://storage.googleapis.com/download.tensorflow.org/data/shakespeare.txt'
dataset_file_path = tf.keras.utils.get_file(
fname=dataset_file_name,
origin=dataset_file_origin,
cache_dir=pathlib.Path(cache_dir).absolute()
)
print(dataset_file_path)
# Reading the database file.
text = open(dataset_file_path, mode='r').read()
print('Length of text: {} characters'.format(len(text)))
# Take a look at the first 250 characters in text.
print(text[:250])
# The unique characters in the file
vocab = sorted(set(text))
print('{} unique characters'.format(len(vocab)))
print('vocab:', vocab)
Before feeding the text to our RNN we need to convert the text from a sequence of characters to a sequence of numbers. To do so we will detect all unique characters in the text, form a vocabulary out of it and replace each character with its index in the vocabulary.
# Map characters to their indices in vocabulary.
char2index = {char: index for index, char in enumerate(vocab)}
print('{')
for char, _ in zip(char2index, range(20)):
print(' {:4s}: {:3d},'.format(repr(char), char2index[char]))
print(' ...\n}')
# Map character indices to characters from vacabulary.
index2char = np.array(vocab)
print(index2char)
# Convert chars in text to indices.
text_as_int = np.array([char2index[char] for char in text])
print('text_as_int length: {}'.format(len(text_as_int)))
print('{} --> {}'.format(repr(text[:15]), repr(text_as_int[:15])))
# The maximum length sentence we want for a single input in characters.
sequence_length = 100
examples_per_epoch = len(text) // (sequence_length + 1)
print('examples_per_epoch:', examples_per_epoch)
# Create training dataset.
char_dataset = tf.data.Dataset.from_tensor_slices(text_as_int)
for char in char_dataset.take(5):
print(index2char[char.numpy()])
# Generate batched sequences out of the char_dataset.
sequences = char_dataset.batch(sequence_length + 1, drop_remainder=True)
# Sequences size is the same as examples_per_epoch.
print('Sequences count: {}'.format(len(list(sequences.as_numpy_iterator()))));
print()
# Sequences examples.
for item in sequences.take(5):
print(repr(''.join(index2char[item.numpy()])))
# sequences shape:
# - 11043 sequences
# - Each sequence of length 101
#
#
# 101 101 101
# [(.....) (.....) ... (.....)]
#
# <---------- 11043 ----------->
For each sequence, duplicate and shift it to form the input and target text. For example, say sequence_length
is 4
and our text is Hello
. The input sequence would be Hell
, and the target sequence ello
.
def split_input_target(chunk):
input_text = chunk[:-1]
target_text = chunk[1:]
return input_text, target_text
dataset = sequences.map(split_input_target)
# Dataset size is the same as examples_per_epoch.
# But each element of a sequence is now has length of `sequence_length`
# and not `sequence_length + 1`.
print('dataset size: {}'.format(len(list(dataset.as_numpy_iterator()))))
for input_example, target_example in dataset.take(1):
print('Input sequence size:', repr(len(input_example.numpy())))
print('Target sequence size:', repr(len(target_example.numpy())))
print()
print('Input:', repr(''.join(index2char[input_example.numpy()])))
print('Target:', repr(''.join(index2char[target_example.numpy()])))
# dataset shape:
# - 11043 sequences
# - Each sequence is a tuple of 2 sub-sequences of length 100 (input_text and target_text)
#
#
# 100 100 100
# /(.....)\ /(.....)\ ... /(.....)\ <-- input_text
# \(.....)/ \(.....)/ \(.....)/ <-- target_text
#
# <----------- 11043 ------------->
Each index of these vectors are processed as one time step. For the input at time step 0, the model receives the index for "F" and trys to predict the index for "i" as the next character. At the next timestep, it does the same thing but the RNN considers the previous step context in addition to the current input character.
for i, (input_idx, target_idx) in enumerate(zip(input_example[:5], target_example[:5])):
print('Step {:2d}'.format(i))
print(' input: {} ({:s})'.format(input_idx, repr(index2char[input_idx])))
print(' expected output: {} ({:s})'.format(target_idx, repr(index2char[target_idx])))
We used tf.data
to split the text into manageable sequences. But before feeding this data into the model, we need to shuffle the data and pack it into batches.
# Batch size.
BATCH_SIZE = 64
# Buffer size to shuffle the dataset (TF data is designed to work
# with possibly infinite sequences, so it doesn't attempt to shuffle
# the entire sequence in memory. Instead, it maintains a buffer in
# which it shuffles elements).
BUFFER_SIZE = 10000
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True)
dataset
print('Batched dataset size: {}'.format(len(list(dataset.as_numpy_iterator()))))
for input_text, target_text in dataset.take(1):
print('1st batch: input_text:', input_text)
print()
print('1st batch: target_text:', target_text)
# dataset shape:
# - 172 batches
# - 64 sequences per batch
# - Each sequence is a tuple of 2 sub-sequences of length 100 (input_text and target_text)
#
#
# 100 100 100 100 100 100
# |/(.....)\ /(.....)\ ... /(.....)\| ... |/(.....)\ /(.....)\ ... /(.....)\| <-- input_text
# |\(.....)/ \(.....)/ \(.....)/| ... |\(.....)/ \(.....)/ \(.....)/| <-- target_text
#
# <------------- 64 ----------------> <------------- 64 ---------------->
#
# <--------------------------------- 172 ----------------------------------->
Use tf.keras.Sequential to define the model. For this simple example three layers are used to define our model:
embedding_dim
dimensions;# Let's do a quick detour and see how Embeding layer works.
# It takes several char indices sequences (batch) as an input.
# It encodes every character of every sequence to a vector of tmp_embeding_size length.
tmp_vocab_size = 10
tmp_embeding_size = 5
tmp_input_length = 8
tmp_batch_size = 2
tmp_model = tf.keras.models.Sequential()
tmp_model.add(tf.keras.layers.Embedding(
input_dim=tmp_vocab_size,
output_dim=tmp_embeding_size,
input_length=tmp_input_length
))
# The model will take as input an integer matrix of size (batch, input_length).
# The largest integer (i.e. word index) in the input should be no larger than 9 (tmp_vocab_size).
# Now model.output_shape == (None, 10, 64), where None is the batch dimension.
tmp_input_array = np.random.randint(
low=0,
high=tmp_vocab_size,
size=(tmp_batch_size, tmp_input_length)
)
tmp_model.compile('rmsprop', 'mse')
tmp_output_array = tmp_model.predict(tmp_input_array)
print('tmp_input_array shape:', tmp_input_array.shape)
print('tmp_input_array:')
print(tmp_input_array)
print()
print('tmp_output_array shape:', tmp_output_array.shape)
print('tmp_output_array:')
print(tmp_output_array)
# Length of the vocabulary in chars.
vocab_size = len(vocab)
# The embedding dimension.
embedding_dim = 256
# Number of RNN units.
rnn_units = 1024
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Embedding(
input_dim=vocab_size,
output_dim=embedding_dim,
batch_input_shape=[batch_size, None]
))
model.add(tf.keras.layers.LSTM(
units=rnn_units,
return_sequences=True,
stateful=True,
recurrent_initializer=tf.keras.initializers.GlorotNormal()
))
model.add(tf.keras.layers.Dense(vocab_size))
return model
model = build_model(vocab_size, embedding_dim, rnn_units, BATCH_SIZE)
model.summary()
tf.keras.utils.plot_model(
model,
show_shapes=True,
show_layer_names=True,
)
For each character the model looks up the embedding, runs the GRU one timestep with the embedding as input, and applies the dense layer to generate logits predicting the log-likelihood of the next character:
Image source: Text generation with an RNN notebook.
for input_example_batch, target_example_batch in dataset.take(1):
example_batch_predictions = model(input_example_batch)
print(example_batch_predictions.shape, "# (batch_size, sequence_length, vocab_size)")
To get actual predictions from the model we need to sample from the output distribution, to get actual character indices. This distribution is defined by the logits over the character vocabulary.
print('Prediction for the 1st letter of the batch 1st sequense:')
print(example_batch_predictions[0, 0])
# Quick overview of how tf.random.categorical() works.
# logits is 2-D Tensor with shape [batch_size, num_classes].
# Each slice [i, :] represents the unnormalized log-probabilities for all classes.
# In the example below we say that the probability for class "0" is low but the
# probability for class "2" is much higher.
tmp_logits = [
[-0.95, 0, 0.95],
];
# Let's generate 5 samples. Each sample is a class index. Class probabilities
# are being taken into account (we expect to see more samples of class "2").
tmp_samples = tf.random.categorical(
logits=tmp_logits,
num_samples=5
)
print(tmp_samples)
sampled_indices = tf.random.categorical(
logits=example_batch_predictions[0],
num_samples=1
)
sampled_indices.shape
sampled_indices = tf.squeeze(
input=sampled_indices,
axis=-1
).numpy()
sampled_indices.shape
sampled_indices
print('Input:\n', repr(''.join(index2char[input_example_batch[0]])))
print()
print('Next char prediction:\n', repr(''.join(index2char[sampled_indices])))
for i, (input_idx, sample_idx) in enumerate(zip(input_example_batch[0][:5], sampled_indices[:5])):
print('Prediction {:2d}'.format(i))
print(' input: {} ({:s})'.format(input_idx, repr(index2char[input_idx])))
print(' next predicted: {} ({:s})'.format(target_idx, repr(index2char[sample_idx])))
At this point the problem can be treated as a standard classification problem. Given the previous RNN state, and the input this time step, predict the class of the next character.
# An objective function.
# The function is any callable with the signature scalar_loss = fn(y_true, y_pred).
def loss(labels, logits):
return tf.keras.losses.sparse_categorical_crossentropy(
y_true=labels,
y_pred=logits,
from_logits=True
)
example_batch_loss = loss(target_example_batch, example_batch_predictions)
print("Prediction shape: ", example_batch_predictions.shape, " # (batch_size, sequence_length, vocab_size)")
print("scalar_loss: ", example_batch_loss.numpy().mean())
adam_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
model.compile(
optimizer=adam_optimizer,
loss=loss
)
# Directory where the checkpoints will be saved.
checkpoint_dir = 'tmp/checkpoints'
os.makedirs(checkpoint_dir, exist_ok=True)
# Name of the checkpoint files
checkpoint_prefix = os.path.join(checkpoint_dir, 'ckpt_{epoch}')
checkpoint_callback=tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_prefix,
save_weights_only=True
)
EPOCHS=40
history = model.fit(
x=dataset,
epochs=EPOCHS,
callbacks=[
checkpoint_callback
]
)
def render_training_history(training_history):
loss = training_history.history['loss']
plt.title('Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.plot(loss, label='Training set')
plt.legend()
plt.grid(linestyle='--', linewidth=1, alpha=0.5)
plt.show()
render_training_history(history)
To keep this prediction step simple, use a batch size of 1.
Because of the way the RNN state is passed from timestep to timestep, the model only accepts a fixed batch size once built.
To run the model with a different batch_size
, we need to rebuild the model and restore the weights from the checkpoint.
tf.train.latest_checkpoint(checkpoint_dir)
simplified_batch_size = 1
model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([simplified_batch_size, None]))
model.summary()
The following code block generates the text:
It Starts by choosing a start string, initializing the RNN state and setting the number of characters to generate.
Get the prediction distribution of the next character using the start string and the RNN state.
Then, use a categorical distribution to calculate the index of the predicted character. Use this predicted character as our next input to the model.
The RNN state returned by the model is fed back into the model so that it now has more context, instead than only one character. After predicting the next character, the modified RNN states are again fed back into the model, which is how it learns as it gets more context from the previously predicted characters.
Image source: Text generation with an RNN notebook.
# num_generate
# - number of characters to generate.
#
# temperature
# - Low temperatures results in more predictable text.
# - Higher temperatures results in more surprising text.
# - Experiment to find the best setting.
def generate_text(model, start_string, num_generate = 1000, temperature=1.0):
# Evaluation step (generating text using the learned model)
# Converting our start string to numbers (vectorizing).
input_indices = [char2index[s] for s in start_string]
input_indices = tf.expand_dims(input_indices, 0)
# Empty string to store our results.
text_generated = []
# Here batch size == 1.
model.reset_states()
for char_index in range(num_generate):
predictions = model(input_indices)
# remove the batch dimension
predictions = tf.squeeze(predictions, 0)
# Using a categorical distribution to predict the character returned by the model.
predictions = predictions / temperature
predicted_id = tf.random.categorical(
predictions,
num_samples=1
)[-1,0].numpy()
# We pass the predicted character as the next input to the model
# along with the previous hidden state.
input_indices = tf.expand_dims([predicted_id], 0)
text_generated.append(index2char[predicted_id])
return (start_string + ''.join(text_generated))
# Generate the text with default temperature (1.0).
print(generate_text(model, start_string=u"ROMEO: "))
# Generate the text with higher temperature to get more unexpected results.
print(generate_text(model, start_string=u"ROMEO: ", temperature=1.5))
model_name = 'text_generation_shakespeare_rnn.h5'
model.save(model_name, save_format='h5')
To use this model on the web we need to convert it into the format that will be understandable by tensorflowjs. To do so we may use tfjs-converter as following:
tensorflowjs_converter --input_format keras \
./experiments/text_generation_shakespeare_rnn/text_generation_shakespeare_rnn.h5 \
./demos/public/models/text_generation_shakespeare_rnn
You find this experiment in the Demo app and play around with it right in you browser to see how the model performs in real life.