#!/usr/bin/env python # coding: utf-8 # **Chapter 16 – Natural Language Processing with RNNs and Attention** # _This notebook contains all the sample code and solutions to the exercises in chapter 16._ # # # #
# Open In Colab # # #
# # Setup # This project requires Python 3.7 or above: # In[1]: import sys assert sys.version_info >= (3, 7) # And TensorFlow ≥ 2.8: # In[2]: from packaging import version import tensorflow as tf assert version.parse(tf.__version__) >= version.parse("2.8.0") # As we did in earlier chapters, let's define the default font sizes to make the figures prettier: # In[3]: import matplotlib.pyplot as plt plt.rc('font', size=14) plt.rc('axes', labelsize=14, titlesize=14) plt.rc('legend', fontsize=14) plt.rc('xtick', labelsize=10) plt.rc('ytick', labelsize=10) # And let's create the `images/nlp` folder (if it doesn't already exist), and define the `save_fig()` function which is used through this notebook to save the figures in high-res for the book: # In[4]: from pathlib import Path IMAGES_PATH = Path() / "images" / "nlp" IMAGES_PATH.mkdir(parents=True, exist_ok=True) def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300): path = IMAGES_PATH / f"{fig_id}.{fig_extension}" if tight_layout: plt.tight_layout() plt.savefig(path, format=fig_extension, dpi=resolution) # This chapter can be very slow without a GPU, so let's make sure there's one, or else issue a warning: # In[5]: if not tf.config.list_physical_devices('GPU'): print("No GPU was detected. Neural nets can be very slow without a GPU.") if "google.colab" in sys.modules: print("Go to Runtime > Change runtime and select a GPU hardware " "accelerator.") if "kaggle_secrets" in sys.modules: print("Go to Settings > Accelerator and select GPU.") # # Generating Shakespearean Text Using a Character RNN # ## Creating the Training Dataset # Let's download the Shakespeare data from Andrej Karpathy's [char-rnn project](https://github.com/karpathy/char-rnn/) # In[6]: import tensorflow as tf shakespeare_url = "https://homl.info/shakespeare" # shortcut URL filepath = tf.keras.utils.get_file("shakespeare.txt", shakespeare_url) with open(filepath) as f: shakespeare_text = f.read() # In[7]: # extra code – shows a short text sample print(shakespeare_text[:80]) # In[8]: # extra code – shows all 39 distinct characters (after converting to lower case) "".join(sorted(set(shakespeare_text.lower()))) # In[9]: text_vec_layer = tf.keras.layers.TextVectorization(split="character", standardize="lower") text_vec_layer.adapt([shakespeare_text]) encoded = text_vec_layer([shakespeare_text])[0] # In[10]: encoded -= 2 # drop tokens 0 (pad) and 1 (unknown), which we will not use n_tokens = text_vec_layer.vocabulary_size() - 2 # number of distinct chars = 39 dataset_size = len(encoded) # total number of chars = 1,115,394 # In[11]: n_tokens # In[12]: dataset_size # In[13]: def to_dataset(sequence, length, shuffle=False, seed=None, batch_size=32): ds = tf.data.Dataset.from_tensor_slices(sequence) ds = ds.window(length + 1, shift=1, drop_remainder=True) ds = ds.flat_map(lambda window_ds: window_ds.batch(length + 1)) if shuffle: ds = ds.shuffle(100_000, seed=seed) ds = ds.batch(batch_size) return ds.map(lambda window: (window[:, :-1], window[:, 1:])).prefetch(1) # In[14]: # extra code – a simple example using to_dataset() # There's just one sample in this dataset: the input represents "to b" and the # output represents "o be" list(to_dataset(text_vec_layer(["To be"])[0], length=4)) # In[15]: length = 100 tf.random.set_seed(42) train_set = to_dataset(encoded[:1_000_000], length=length, shuffle=True, seed=42) valid_set = to_dataset(encoded[1_000_000:1_060_000], length=length) test_set = to_dataset(encoded[1_060_000:], length=length) # ## Building and Training the Char-RNN Model # **Warning**: the following code may one or two hours to run, depending on your GPU. Without a GPU, it may take over 24 hours. If you don't want to wait, just skip the next two code cells and run the code below to download a pretrained model. # **Note**: the `GRU` class will only use cuDNN acceleration (assuming you have a GPU) when using the default values for the following arguments: `activation`, `recurrent_activation`, `recurrent_dropout`, `unroll`, `use_bias` and `reset_after`. # In[16]: tf.random.set_seed(42) # extra code – ensures reproducibility on CPU model = tf.keras.Sequential([ tf.keras.layers.Embedding(input_dim=n_tokens, output_dim=16), tf.keras.layers.GRU(128, return_sequences=True), tf.keras.layers.Dense(n_tokens, activation="softmax") ]) model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"]) model_ckpt = tf.keras.callbacks.ModelCheckpoint( "my_shakespeare_model", monitor="val_accuracy", save_best_only=True) history = model.fit(train_set, validation_data=valid_set, epochs=10, callbacks=[model_ckpt]) # In[17]: shakespeare_model = tf.keras.Sequential([ text_vec_layer, tf.keras.layers.Lambda(lambda X: X - 2), # no or tokens model ]) # If you don't want to wait for training to complete, I've pretrained a model for you. The following code will download it. Uncomment the last line if you want to use it instead of the model trained above. # In[18]: # extra code – downloads a pretrained model url = "https://github.com/ageron/data/raw/main/shakespeare_model.tgz" path = tf.keras.utils.get_file("shakespeare_model.tgz", url, extract=True) model_path = Path(path).with_name("shakespeare_model") #shakespeare_model = tf.keras.models.load_model(model_path) # In[19]: y_proba = shakespeare_model.predict(["To be or not to b"])[0, -1] y_pred = tf.argmax(y_proba) # choose the most probable character ID text_vec_layer.get_vocabulary()[y_pred + 2] # ## Generating Fake Shakespearean Text # In[20]: log_probas = tf.math.log([[0.5, 0.4, 0.1]]) # probas = 50%, 40%, and 10% tf.random.set_seed(42) tf.random.categorical(log_probas, num_samples=8) # draw 8 samples # In[21]: def next_char(text, temperature=1): y_proba = shakespeare_model.predict([text])[0, -1:] rescaled_logits = tf.math.log(y_proba) / temperature char_id = tf.random.categorical(rescaled_logits, num_samples=1)[0, 0] return text_vec_layer.get_vocabulary()[char_id + 2] # In[22]: def extend_text(text, n_chars=50, temperature=1): for _ in range(n_chars): text += next_char(text, temperature) return text # In[23]: tf.random.set_seed(42) # extra code – ensures reproducibility on CPU # In[24]: print(extend_text("To be or not to be", temperature=0.01)) # In[25]: print(extend_text("To be or not to be", temperature=1)) # In[26]: print(extend_text("To be or not to be", temperature=100)) # ## Stateful RNN # In[27]: def to_dataset_for_stateful_rnn(sequence, length): ds = tf.data.Dataset.from_tensor_slices(sequence) ds = ds.window(length + 1, shift=length, drop_remainder=True) ds = ds.flat_map(lambda window: window.batch(length + 1)).batch(1) return ds.map(lambda window: (window[:, :-1], window[:, 1:])).prefetch(1) stateful_train_set = to_dataset_for_stateful_rnn(encoded[:1_000_000], length) stateful_valid_set = to_dataset_for_stateful_rnn(encoded[1_000_000:1_060_000], length) stateful_test_set = to_dataset_for_stateful_rnn(encoded[1_060_000:], length) # In[28]: # extra code – simple example using to_dataset_for_stateful_rnn() list(to_dataset_for_stateful_rnn(tf.range(10), 3)) # If you'd like to have more than one window per batch, you can use the `to_batched_dataset_for_stateful_rnn()` function instead of `to_dataset_for_stateful_rnn()`: # In[29]: # extra code – shows one way to prepare a batched dataset for a stateful RNN import numpy as np def to_non_overlapping_windows(sequence, length): ds = tf.data.Dataset.from_tensor_slices(sequence) ds = ds.window(length + 1, shift=length, drop_remainder=True) return ds.flat_map(lambda window: window.batch(length + 1)) def to_batched_dataset_for_stateful_rnn(sequence, length, batch_size=32): parts = np.array_split(sequence, batch_size) datasets = tuple(to_non_overlapping_windows(part, length) for part in parts) ds = tf.data.Dataset.zip(datasets).map(lambda *windows: tf.stack(windows)) return ds.map(lambda window: (window[:, :-1], window[:, 1:])).prefetch(1) list(to_batched_dataset_for_stateful_rnn(tf.range(20), length=3, batch_size=2)) # In[30]: tf.random.set_seed(42) # extra code – ensures reproducibility on CPU model = tf.keras.Sequential([ tf.keras.layers.Embedding(input_dim=n_tokens, output_dim=16, batch_input_shape=[1, None]), tf.keras.layers.GRU(128, return_sequences=True, stateful=True), tf.keras.layers.Dense(n_tokens, activation="softmax") ]) # In[31]: class ResetStatesCallback(tf.keras.callbacks.Callback): def on_epoch_begin(self, epoch, logs): self.model.reset_states() # In[32]: # extra code – use a different directory to save the checkpoints model_ckpt = tf.keras.callbacks.ModelCheckpoint( "my_stateful_shakespeare_model", monitor="val_accuracy", save_best_only=True) # **Warning**: the following cell will take a while to run (possibly an hour if you are not using a GPU). # In[33]: model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"]) history = model.fit(stateful_train_set, validation_data=stateful_valid_set, epochs=10, callbacks=[ResetStatesCallback(), model_ckpt]) # **Extra Material: converting the stateful RNN to a stateless RNN and using it** # To use the model with different batch sizes, we need to create a stateless copy: # In[34]: stateless_model = tf.keras.Sequential([ tf.keras.layers.Embedding(input_dim=n_tokens, output_dim=16), tf.keras.layers.GRU(128, return_sequences=True), tf.keras.layers.Dense(n_tokens, activation="softmax") ]) # To set the weights, we first need to build the model (so the weights get created): # In[35]: stateless_model.build(tf.TensorShape([None, None])) # In[36]: stateless_model.set_weights(model.get_weights()) # In[37]: shakespeare_model = tf.keras.Sequential([ text_vec_layer, tf.keras.layers.Lambda(lambda X: X - 2), # no or tokens stateless_model ]) # In[38]: tf.random.set_seed(42) print(extend_text("to be or not to be", temperature=0.01)) # # Sentiment Analysis # In[39]: import tensorflow_datasets as tfds raw_train_set, raw_valid_set, raw_test_set = tfds.load( name="imdb_reviews", split=["train[:90%]", "train[90%:]", "test"], as_supervised=True ) tf.random.set_seed(42) train_set = raw_train_set.shuffle(5000, seed=42).batch(32).prefetch(1) valid_set = raw_valid_set.batch(32).prefetch(1) test_set = raw_test_set.batch(32).prefetch(1) # In[40]: for review, label in raw_train_set.take(4): print(review.numpy().decode("utf-8")[:200], "...") print("Label:", label.numpy()) # In[41]: vocab_size = 1000 text_vec_layer = tf.keras.layers.TextVectorization(max_tokens=vocab_size) text_vec_layer.adapt(train_set.map(lambda reviews, labels: reviews)) # **Warning**: the following cell will take a few minutes to run and the model will probably not learn anything because we didn't mask the padding tokens (that's the point of the next section). # In[42]: embed_size = 128 tf.random.set_seed(42) model = tf.keras.Sequential([ text_vec_layer, tf.keras.layers.Embedding(vocab_size, embed_size), tf.keras.layers.GRU(128), tf.keras.layers.Dense(1, activation="sigmoid") ]) model.compile(loss="binary_crossentropy", optimizer="nadam", metrics=["accuracy"]) history = model.fit(train_set, validation_data=valid_set, epochs=2) # ## Masking # **Warning**: the following cell will take a while to run (possibly 30 minutes if you are not using a GPU). # In[43]: embed_size = 128 tf.random.set_seed(42) model = tf.keras.Sequential([ text_vec_layer, tf.keras.layers.Embedding(vocab_size, embed_size, mask_zero=True), tf.keras.layers.GRU(128), tf.keras.layers.Dense(1, activation="sigmoid") ]) model.compile(loss="binary_crossentropy", optimizer="nadam", metrics=["accuracy"]) history = model.fit(train_set, validation_data=valid_set, epochs=5) # Or using manual masking: # In[44]: tf.random.set_seed(42) # extra code – ensures reproducibility on the CPU inputs = tf.keras.layers.Input(shape=[], dtype=tf.string) token_ids = text_vec_layer(inputs) mask = tf.math.not_equal(token_ids, 0) Z = tf.keras.layers.Embedding(vocab_size, embed_size)(token_ids) Z = tf.keras.layers.GRU(128, dropout=0.2)(Z, mask=mask) outputs = tf.keras.layers.Dense(1, activation="sigmoid")(Z) model = tf.keras.Model(inputs=[inputs], outputs=[outputs]) # **Warning**: the following cell will take a while to run (possibly 30 minutes if you are not using a GPU). # In[45]: # extra code – compiles and trains the model, as usual model.compile(loss="binary_crossentropy", optimizer="nadam", metrics=["accuracy"]) history = model.fit(train_set, validation_data=valid_set, epochs=5) # **Extra material: using ragged tensors** # In[46]: text_vec_layer_ragged = tf.keras.layers.TextVectorization( max_tokens=vocab_size, ragged=True) text_vec_layer_ragged.adapt(train_set.map(lambda reviews, labels: reviews)) text_vec_layer_ragged(["Great movie!", "This is DiCaprio's best role."]) # In[47]: text_vec_layer(["Great movie!", "This is DiCaprio's best role."]) # **Warning**: the following cell will take a while to run (possibly 30 minutes if you are not using a GPU). # In[48]: embed_size = 128 tf.random.set_seed(42) model = tf.keras.Sequential([ text_vec_layer_ragged, tf.keras.layers.Embedding(vocab_size, embed_size), tf.keras.layers.GRU(128), tf.keras.layers.Dense(1, activation="sigmoid") ]) model.compile(loss="binary_crossentropy", optimizer="nadam", metrics=["accuracy"]) history = model.fit(train_set, validation_data=valid_set, epochs=5) # ## Reusing Pretrained Embeddings and Language Models # **Warning**: the following cell will take a while to run (possibly an hour if you are not using a GPU). # In[49]: import os import tensorflow_hub as hub os.environ["TFHUB_CACHE_DIR"] = "my_tfhub_cache" tf.random.set_seed(42) # extra code – ensures reproducibility on CPU model = tf.keras.Sequential([ hub.KerasLayer("https://tfhub.dev/google/universal-sentence-encoder/4", trainable=True, dtype=tf.string, input_shape=[]), tf.keras.layers.Dense(64, activation="relu"), tf.keras.layers.Dense(1, activation="sigmoid") ]) model.compile(loss="binary_crossentropy", optimizer="nadam", metrics=["accuracy"]) model.fit(train_set, validation_data=valid_set, epochs=10) # # An Encoder–Decoder Network for Neural Machine Translation # In[50]: url = "https://storage.googleapis.com/download.tensorflow.org/data/spa-eng.zip" path = tf.keras.utils.get_file("spa-eng.zip", origin=url, cache_dir="datasets", extract=True) text = (Path(path).with_name("spa-eng") / "spa.txt").read_text() # In[51]: import numpy as np text = text.replace("¡", "").replace("¿", "") pairs = [line.split("\t") for line in text.splitlines()] np.random.seed(42) # extra code – ensures reproducibility on CPU np.random.shuffle(pairs) sentences_en, sentences_es = zip(*pairs) # separates the pairs into 2 lists # In[52]: for i in range(3): print(sentences_en[i], "=>", sentences_es[i]) # In[53]: vocab_size = 1000 max_length = 50 text_vec_layer_en = tf.keras.layers.TextVectorization( vocab_size, output_sequence_length=max_length) text_vec_layer_es = tf.keras.layers.TextVectorization( vocab_size, output_sequence_length=max_length) text_vec_layer_en.adapt(sentences_en) text_vec_layer_es.adapt([f"startofseq {s} endofseq" for s in sentences_es]) # In[54]: text_vec_layer_en.get_vocabulary()[:10] # In[55]: text_vec_layer_es.get_vocabulary()[:10] # In[56]: X_train = tf.constant(sentences_en[:100_000]) X_valid = tf.constant(sentences_en[100_000:]) X_train_dec = tf.constant([f"startofseq {s}" for s in sentences_es[:100_000]]) X_valid_dec = tf.constant([f"startofseq {s}" for s in sentences_es[100_000:]]) Y_train = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[:100_000]]) Y_valid = text_vec_layer_es([f"{s} endofseq" for s in sentences_es[100_000:]]) # In[57]: tf.random.set_seed(42) # extra code – ensures reproducibility on CPU encoder_inputs = tf.keras.layers.Input(shape=[], dtype=tf.string) decoder_inputs = tf.keras.layers.Input(shape=[], dtype=tf.string) # In[58]: embed_size = 128 encoder_input_ids = text_vec_layer_en(encoder_inputs) decoder_input_ids = text_vec_layer_es(decoder_inputs) encoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size, mask_zero=True) decoder_embedding_layer = tf.keras.layers.Embedding(vocab_size, embed_size, mask_zero=True) encoder_embeddings = encoder_embedding_layer(encoder_input_ids) decoder_embeddings = decoder_embedding_layer(decoder_input_ids) # In[59]: encoder = tf.keras.layers.LSTM(512, return_state=True) encoder_outputs, *encoder_state = encoder(encoder_embeddings) # In[60]: decoder = tf.keras.layers.LSTM(512, return_sequences=True) decoder_outputs = decoder(decoder_embeddings, initial_state=encoder_state) # In[61]: output_layer = tf.keras.layers.Dense(vocab_size, activation="softmax") Y_proba = output_layer(decoder_outputs) # **Warning**: the following cell will take a while to run (possibly a couple hours if you are not using a GPU). # In[62]: model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[Y_proba]) model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"]) model.fit((X_train, X_train_dec), Y_train, epochs=10, validation_data=((X_valid, X_valid_dec), Y_valid)) # In[63]: def translate(sentence_en): translation = "" for word_idx in range(max_length): X = np.array([sentence_en]) # encoder input X_dec = np.array(["startofseq " + translation]) # decoder input y_proba = model.predict((X, X_dec))[0, word_idx] # last token's probas predicted_word_id = np.argmax(y_proba) predicted_word = text_vec_layer_es.get_vocabulary()[predicted_word_id] if predicted_word == "endofseq": break translation += " " + predicted_word return translation.strip() # In[64]: translate("I like soccer") # Nice! However, the model struggles with longer sentences: # In[65]: translate("I like soccer and also going to the beach") # ## Bidirectional RNNs # To create a bidirectional recurrent layer, just wrap a regular recurrent layer in a `Bidirectional` layer: # In[66]: tf.random.set_seed(42) # extra code – ensures reproducibility on CPU encoder = tf.keras.layers.Bidirectional( tf.keras.layers.LSTM(256, return_state=True)) # In[67]: encoder_outputs, *encoder_state = encoder(encoder_embeddings) encoder_state = [tf.concat(encoder_state[::2], axis=-1), # short-term (0 & 2) tf.concat(encoder_state[1::2], axis=-1)] # long-term (1 & 3) # **Warning**: the following cell will take a while to run (possibly a couple hours if you are not using a GPU). # In[68]: # extra code — completes the model and trains it decoder = tf.keras.layers.LSTM(512, return_sequences=True) decoder_outputs = decoder(decoder_embeddings, initial_state=encoder_state) output_layer = tf.keras.layers.Dense(vocab_size, activation="softmax") Y_proba = output_layer(decoder_outputs) model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[Y_proba]) model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"]) model.fit((X_train, X_train_dec), Y_train, epochs=10, validation_data=((X_valid, X_valid_dec), Y_valid)) # In[69]: translate("I like soccer") # ## Beam Search # This is a very basic implementation of beam search. I tried to make it readable and understandable, but it's definitely not optimized for speed! The function first uses the model to find the top _k_ words to start the translations (where _k_ is the beam width). For each of the top _k_ translations, it evaluates the conditional probabilities of all possible words it could add to that translation. These extended translations and their probabilities are added to the list of candidates. Once we've gone through all top _k_ translations and all words that could complete them, we keep only the top _k_ candidates with the highest probability, and we iterate over and over until they all finish with an EOS token. The top translation is then returned (after removing its EOS token). # # * Note: If p(S) is the probability of sentence S, and p(W|S) is the conditional probability of the word W given that the translation starts with S, then the probability of the sentence S' = concat(S, W) is p(S') = p(S) * p(W|S). As we add more words, the probability gets smaller and smaller. To avoid the risk of it getting too small, which could cause floating point precision errors, the function keeps track of log probabilities instead of probabilities: recall that log(a\*b) = log(a) + log(b), therefore log(p(S')) = log(p(S)) + log(p(W|S)). # In[70]: # extra code – a basic implementation of beam search def beam_search(sentence_en, beam_width, verbose=False): X = np.array([sentence_en]) # encoder input X_dec = np.array(["startofseq"]) # decoder input y_proba = model.predict((X, X_dec))[0, 0] # first token's probas top_k = tf.math.top_k(y_proba, k=beam_width) top_translations = [ # list of best (log_proba, translation) (np.log(word_proba), text_vec_layer_es.get_vocabulary()[word_id]) for word_proba, word_id in zip(top_k.values, top_k.indices) ] # extra code – displays the top first words in verbose mode if verbose: print("Top first words:", top_translations) for idx in range(1, max_length): candidates = [] for log_proba, translation in top_translations: if translation.endswith("endofseq"): candidates.append((log_proba, translation)) continue # translation is finished, so don't try to extend it X = np.array([sentence_en]) # encoder input X_dec = np.array(["startofseq " + translation]) # decoder input y_proba = model.predict((X, X_dec))[0, idx] # last token's proba for word_id, word_proba in enumerate(y_proba): word = text_vec_layer_es.get_vocabulary()[word_id] candidates.append((log_proba + np.log(word_proba), f"{translation} {word}")) top_translations = sorted(candidates, reverse=True)[:beam_width] # extra code – displays the top translation so far in verbose mode if verbose: print("Top translations so far:", top_translations) if all([tr.endswith("endofseq") for _, tr in top_translations]): return top_translations[0][1].replace("endofseq", "").strip() # In[71]: # extra code – shows how the model making an error sentence_en = "I love cats and dogs" translate(sentence_en) # In[72]: # extra code – shows how beam search can help beam_search(sentence_en, beam_width=3, verbose=True) # The correct translation is in the top 3 sentences found by beam search, but it's not the first. Since we're using a small vocabulary, the \[UNK] token is quite frequent, so you may want to penalize it (e.g., divide its probability by 2 in the beam search function): this will discourage beam search from using it too much. # # Attention Mechanisms # We need to feed all the encoder's outputs to the `Attention` layer, so we must add `return_sequences=True` to the encoder: # In[73]: tf.random.set_seed(42) # extra code – ensures reproducibility on CPU encoder = tf.keras.layers.Bidirectional( tf.keras.layers.LSTM(256, return_sequences=True, return_state=True)) # In[74]: # extra code – this part of the model is exactly the same as earlier encoder_outputs, *encoder_state = encoder(encoder_embeddings) encoder_state = [tf.concat(encoder_state[::2], axis=-1), # short-term (0 & 2) tf.concat(encoder_state[1::2], axis=-1)] # long-term (1 & 3) decoder = tf.keras.layers.LSTM(512, return_sequences=True) decoder_outputs = decoder(decoder_embeddings, initial_state=encoder_state) # And finally, let's add the `Attention` layer and the output layer: # In[75]: attention_layer = tf.keras.layers.Attention() attention_outputs = attention_layer([decoder_outputs, encoder_outputs]) output_layer = tf.keras.layers.Dense(vocab_size, activation="softmax") Y_proba = output_layer(attention_outputs) # **Warning**: the following cell will take a while to run (possibly a couple hours if you are not using a GPU). # In[76]: model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[Y_proba]) model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"]) model.fit((X_train, X_train_dec), Y_train, epochs=10, validation_data=((X_valid, X_valid_dec), Y_valid)) # In[77]: translate("I like soccer and also going to the beach") # In[78]: beam_search("I like soccer and also going to the beach", beam_width=3, verbose=True) # ## Attention Is All You Need: The Transformer Architecture # ### Positional encodings # In[79]: max_length = 50 # max length in the whole training set embed_size = 128 tf.random.set_seed(42) # extra code – ensures reproducibility on CPU pos_embed_layer = tf.keras.layers.Embedding(max_length, embed_size) batch_max_len_enc = tf.shape(encoder_embeddings)[1] encoder_in = encoder_embeddings + pos_embed_layer(tf.range(batch_max_len_enc)) batch_max_len_dec = tf.shape(decoder_embeddings)[1] decoder_in = decoder_embeddings + pos_embed_layer(tf.range(batch_max_len_dec)) # Alternatively, we can use fixed, non-trainable positional encodings: # In[80]: class PositionalEncoding(tf.keras.layers.Layer): def __init__(self, max_length, embed_size, dtype=tf.float32, **kwargs): super().__init__(dtype=dtype, **kwargs) assert embed_size % 2 == 0, "embed_size must be even" p, i = np.meshgrid(np.arange(max_length), 2 * np.arange(embed_size // 2)) pos_emb = np.empty((1, max_length, embed_size)) pos_emb[0, :, ::2] = np.sin(p / 10_000 ** (i / embed_size)).T pos_emb[0, :, 1::2] = np.cos(p / 10_000 ** (i / embed_size)).T self.pos_encodings = tf.constant(pos_emb.astype(self.dtype)) self.supports_masking = True def call(self, inputs): batch_max_length = tf.shape(inputs)[1] return inputs + self.pos_encodings[:, :batch_max_length] # In[81]: pos_embed_layer = PositionalEncoding(max_length, embed_size) encoder_in = pos_embed_layer(encoder_embeddings) decoder_in = pos_embed_layer(decoder_embeddings) # In[82]: # extra code – this cells generates and saves Figure 16–9 figure_max_length = 201 figure_embed_size = 512 pos_emb = PositionalEncoding(figure_max_length, figure_embed_size) zeros = np.zeros((1, figure_max_length, figure_embed_size), np.float32) P = pos_emb(zeros)[0].numpy() i1, i2, crop_i = 100, 101, 150 p1, p2, p3 = 22, 60, 35 fig, (ax1, ax2) = plt.subplots(nrows=2, ncols=1, sharex=True, figsize=(9, 5)) ax1.plot([p1, p1], [-1, 1], "k--", label="$p = {}$".format(p1)) ax1.plot([p2, p2], [-1, 1], "k--", label="$p = {}$".format(p2), alpha=0.5) ax1.plot(p3, P[p3, i1], "bx", label="$p = {}$".format(p3)) ax1.plot(P[:,i1], "b-", label="$i = {}$".format(i1)) ax1.plot(P[:,i2], "r-", label="$i = {}$".format(i2)) ax1.plot([p1, p2], [P[p1, i1], P[p2, i1]], "bo") ax1.plot([p1, p2], [P[p1, i2], P[p2, i2]], "ro") ax1.legend(loc="center right", fontsize=14, framealpha=0.95) ax1.set_ylabel("$P_{(p,i)}$", rotation=0, fontsize=16) ax1.grid(True, alpha=0.3) ax1.hlines(0, 0, figure_max_length - 1, color="k", linewidth=1, alpha=0.3) ax1.axis([0, figure_max_length - 1, -1, 1]) ax2.imshow(P.T[:crop_i], cmap="gray", interpolation="bilinear", aspect="auto") ax2.hlines(i1, 0, figure_max_length - 1, color="b", linewidth=3) cheat = 2 # need to raise the red line a bit, or else it hides the blue one ax2.hlines(i2+cheat, 0, figure_max_length - 1, color="r", linewidth=3) ax2.plot([p1, p1], [0, crop_i], "k--") ax2.plot([p2, p2], [0, crop_i], "k--", alpha=0.5) ax2.plot([p1, p2], [i2+cheat, i2+cheat], "ro") ax2.plot([p1, p2], [i1, i1], "bo") ax2.axis([0, figure_max_length - 1, 0, crop_i]) ax2.set_xlabel("$p$", fontsize=16) ax2.set_ylabel("$i$", rotation=0, fontsize=16) save_fig("positional_embedding_plot") plt.show() # ### Multi-Head Attention # In[83]: N = 2 # instead of 6 num_heads = 8 dropout_rate = 0.1 n_units = 128 # for the first Dense layer in each Feed Forward block encoder_pad_mask = tf.math.not_equal(encoder_input_ids, 0)[:, tf.newaxis] Z = encoder_in for _ in range(N): skip = Z attn_layer = tf.keras.layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate) Z = attn_layer(Z, value=Z, attention_mask=encoder_pad_mask) Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip])) skip = Z Z = tf.keras.layers.Dense(n_units, activation="relu")(Z) Z = tf.keras.layers.Dense(embed_size)(Z) Z = tf.keras.layers.Dropout(dropout_rate)(Z) Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip])) # In[84]: decoder_pad_mask = tf.math.not_equal(decoder_input_ids, 0)[:, tf.newaxis] causal_mask = tf.linalg.band_part( # creates a lower triangular matrix tf.ones((batch_max_len_dec, batch_max_len_dec), tf.bool), -1, 0) # In[85]: encoder_outputs = Z # let's save the encoder's final outputs Z = decoder_in # the decoder starts with its own inputs for _ in range(N): skip = Z attn_layer = tf.keras.layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate) Z = attn_layer(Z, value=Z, attention_mask=causal_mask & decoder_pad_mask) Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip])) skip = Z attn_layer = tf.keras.layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_size, dropout=dropout_rate) Z = attn_layer(Z, value=encoder_outputs, attention_mask=encoder_pad_mask) Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip])) skip = Z Z = tf.keras.layers.Dense(n_units, activation="relu")(Z) Z = tf.keras.layers.Dense(embed_size)(Z) Z = tf.keras.layers.LayerNormalization()(tf.keras.layers.Add()([Z, skip])) # **Warning**: the following cell will take a while to run (possibly 2 or 3 hours if you are not using a GPU). # In[86]: Y_proba = tf.keras.layers.Dense(vocab_size, activation="softmax")(Z) model = tf.keras.Model(inputs=[encoder_inputs, decoder_inputs], outputs=[Y_proba]) model.compile(loss="sparse_categorical_crossentropy", optimizer="nadam", metrics=["accuracy"]) model.fit((X_train, X_train_dec), Y_train, epochs=10, validation_data=((X_valid, X_valid_dec), Y_valid)) # In[87]: translate("I like soccer and also going to the beach") # # HuggingFace # Install the Transformers and Datasets libraries if we're running on Colab: # In[88]: if "google.colab" in sys.modules: get_ipython().run_line_magic('pip', 'install -q -U transformers') get_ipython().run_line_magic('pip', 'install -q -U datasets') # In[89]: from transformers import pipeline classifier = pipeline("sentiment-analysis") # many other tasks are available result = classifier("The actors were very convincing.") # Models can be very biased. For example, it may like or dislike some countries depending on the data it was trained on, and how it is used, so use it with care: # In[90]: classifier(["I am from India.", "I am from Iraq."]) # In[91]: model_name = "huggingface/distilbert-base-uncased-finetuned-mnli" classifier_mnli = pipeline("text-classification", model=model_name) classifier_mnli("She loves me. [SEP] She loves me not.") # In[92]: from transformers import AutoTokenizer, TFAutoModelForSequenceClassification tokenizer = AutoTokenizer.from_pretrained(model_name) model = TFAutoModelForSequenceClassification.from_pretrained(model_name) # In[93]: token_ids = tokenizer(["I like soccer. [SEP] We all love soccer!", "Joe lived for a very long time. [SEP] Joe is old."], padding=True, return_tensors="tf") token_ids # In[94]: token_ids = tokenizer([("I like soccer.", "We all love soccer!"), ("Joe lived for a very long time.", "Joe is old.")], padding=True, return_tensors="tf") token_ids # In[95]: outputs = model(token_ids) outputs # In[96]: Y_probas = tf.keras.activations.softmax(outputs.logits) Y_probas # In[97]: Y_pred = tf.argmax(Y_probas, axis=1) Y_pred # 0 = contradiction, 1 = entailment, 2 = neutral # In[98]: sentences = [("Sky is blue", "Sky is red"), ("I love her", "She loves me")] X_train = tokenizer(sentences, padding=True, return_tensors="tf").data y_train = tf.constant([0, 2]) # contradiction, neutral loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) model.compile(loss=loss, optimizer="nadam", metrics=["accuracy"]) history = model.fit(X_train, y_train, epochs=2) # # Exercise solutions # ## 1. to 7. # 1. Stateless RNNs can only capture patterns whose length is less than, or equal to, the size of the windows the RNN is trained on. Conversely, stateful RNNs can capture longer-term patterns. However, implementing a stateful RNN is much harder⁠—especially preparing the dataset properly. Moreover, stateful RNNs do not always work better, in part because consecutive batches are not independent and identically distributed (IID). Gradient Descent is not fond of non-IID datasets. # 2. In general, if you translate a sentence one word at a time, the result will be terrible. For example, the French sentence "Je vous en prie" means "You are welcome," but if you translate it one word at a time, you get "I you in pray." Huh? It is much better to read the whole sentence first and then translate it. A plain sequence-to-sequence RNN would start translating a sentence immediately after reading the first word, while an Encoder–Decoder RNN will first read the whole sentence and then translate it. That said, one could imagine a plain sequence-to-sequence RNN that would output silence whenever it is unsure about what to say next (just like human translators do when they must translate a live broadcast). # 3. Variable-length input sequences can be handled by padding the shorter sequences so that all sequences in a batch have the same length, and using masking to ensure the RNN ignores the padding token. For better performance, you may also want to create batches containing sequences of similar sizes. Ragged tensors can hold sequences of variable lengths, and Keras now supports them, which simplifies handling variable-length input sequences (at the time of this writing, it still does not handle ragged tensors as targets on the GPU, though). Regarding variable-length output sequences, if the length of the output sequence is known in advance (e.g., if you know that it is the same as the input sequence), then you just need to configure the loss function so that it ignores tokens that come after the end of the sequence. Similarly, the code that will use the model should ignore tokens beyond the end of the sequence. But generally the length of the output sequence is not known ahead of time, so the solution is to train the model so that it outputs an end-of-sequence token at the end of each sequence. # 4. Beam search is a technique used to improve the performance of a trained Encoder–Decoder model, for example in a neural machine translation system. The algorithm keeps track of a short list of the _k_ most promising output sentences (say, the top three), and at each decoder step it tries to extend them by one word; then it keeps only the _k_ most likely sentences. The parameter _k_ is called the _beam width_: the larger it is, the more CPU and RAM will be used, but also the more accurate the system will be. Instead of greedily choosing the most likely next word at each step to extend a single sentence, this technique allows the system to explore several promising sentences simultaneously. Moreover, this technique lends itself well to parallelization. You can implement beam search by writing a custom memory cell. Alternatively, TensorFlow Addons's seq2seq API provides an implementation. # 5. An attention mechanism is a technique initially used in Encoder–Decoder models to give the decoder more direct access to the input sequence, allowing it to deal with longer input sequences. At each decoder time step, the current decoder's state and the full output of the encoder are processed by an alignment model that outputs an alignment score for each input time step. This score indicates which part of the input is most relevant to the current decoder time step. The weighted sum of the encoder output (weighted by their alignment score) is then fed to the decoder, which produces the next decoder state and the output for this time step. The main benefit of using an attention mechanism is the fact that the Encoder–Decoder model can successfully process longer input sequences. Another benefit is that the alignment scores make the model easier to debug and interpret: for example, if the model makes a mistake, you can look at which part of the input it was paying attention to, and this can help diagnose the issue. An attention mechanism is also at the core of the Transformer architecture, in the Multi-Head Attention layers. See the next answer. # 6. The most important layer in the Transformer architecture is the Multi-Head Attention layer (the original Transformer architecture contains 18 of them, including 6 Masked Multi-Head Attention layers). It is at the core of language models such as BERT and GPT-2. Its purpose is to allow the model to identify which words are most aligned with each other, and then improve each word's representation using these contextual clues. # 7. Sampled softmax is used when training a classification model when there are many classes (e.g., thousands). It computes an approximation of the cross-entropy loss based on the logit predicted by the model for the correct class, and the predicted logits for a sample of incorrect words. This speeds up training considerably compared to computing the softmax over all logits and then estimating the cross-entropy loss. After training, the model can be used normally, using the regular softmax function to compute all the class probabilities based on all the logits. # ## 8. # _Exercise:_ Embedded Reber grammars _were used by Hochreiter and Schmidhuber in [their paper](https://homl.info/93) about LSTMs. They are artificial grammars that produce strings such as "BPBTSXXVPSEPE." Check out Jenny Orr's [nice introduction](https://homl.info/108) to this topic. Choose a particular embedded Reber grammar (such as the one represented on Jenny Orr's page), then train an RNN to identify whether a string respects that grammar or not. You will first need to write a function capable of generating a training batch containing about 50% strings that respect the grammar, and 50% that don't._ # First we need to build a function that generates strings based on a grammar. The grammar will be represented as a list of possible transitions for each state. A transition specifies the string to output (or a grammar to generate it) and the next state. # In[99]: default_reber_grammar = [ [("B", 1)], # (state 0) =B=>(state 1) [("T", 2), ("P", 3)], # (state 1) =T=>(state 2) or =P=>(state 3) [("S", 2), ("X", 4)], # (state 2) =S=>(state 2) or =X=>(state 4) [("T", 3), ("V", 5)], # and so on... [("X", 3), ("S", 6)], [("P", 4), ("V", 6)], [("E", None)]] # (state 6) =E=>(terminal state) embedded_reber_grammar = [ [("B", 1)], [("T", 2), ("P", 3)], [(default_reber_grammar, 4)], [(default_reber_grammar, 5)], [("T", 6)], [("P", 6)], [("E", None)]] def generate_string(grammar): state = 0 output = [] while state is not None: index = np.random.randint(len(grammar[state])) production, state = grammar[state][index] if isinstance(production, list): production = generate_string(grammar=production) output.append(production) return "".join(output) # Let's generate a few strings based on the default Reber grammar: # In[100]: np.random.seed(42) for _ in range(25): print(generate_string(default_reber_grammar), end=" ") # Looks good. Now let's generate a few strings based on the embedded Reber grammar: # In[101]: np.random.seed(42) for _ in range(25): print(generate_string(embedded_reber_grammar), end=" ") # Okay, now we need a function to generate strings that do not respect the grammar. We could generate a random string, but the task would be a bit too easy, so instead we will generate a string that respects the grammar, and we will corrupt it by changing just one character: # In[102]: POSSIBLE_CHARS = "BEPSTVX" def generate_corrupted_string(grammar, chars=POSSIBLE_CHARS): good_string = generate_string(grammar) index = np.random.randint(len(good_string)) good_char = good_string[index] bad_char = np.random.choice(sorted(set(chars) - set(good_char))) return good_string[:index] + bad_char + good_string[index + 1:] # Let's look at a few corrupted strings: # In[103]: np.random.seed(42) for _ in range(25): print(generate_corrupted_string(embedded_reber_grammar), end=" ") # We cannot feed strings directly to an RNN, so we need to encode them somehow. One option would be to one-hot encode each character. Another option is to use embeddings. Let's go for the second option (but since there are just a handful of characters, one-hot encoding would probably be a good option as well). For embeddings to work, we need to convert each string into a sequence of character IDs. Let's write a function for that, using each character's index in the string of possible characters "BEPSTVX": # In[104]: def string_to_ids(s, chars=POSSIBLE_CHARS): return [chars.index(c) for c in s] # In[105]: string_to_ids("BTTTXXVVETE") # We can now generate the dataset, with 50% good strings, and 50% bad strings: # In[106]: def generate_dataset(size): good_strings = [ string_to_ids(generate_string(embedded_reber_grammar)) for _ in range(size // 2) ] bad_strings = [ string_to_ids(generate_corrupted_string(embedded_reber_grammar)) for _ in range(size - size // 2) ] all_strings = good_strings + bad_strings X = tf.ragged.constant(all_strings, ragged_rank=1) y = np.array([[1.] for _ in range(len(good_strings))] + [[0.] for _ in range(len(bad_strings))]) return X, y # In[107]: np.random.seed(42) X_train, y_train = generate_dataset(10000) X_valid, y_valid = generate_dataset(2000) # Let's take a look at the first training sequence: # In[108]: X_train[0] # What class does it belong to? # In[109]: y_train[0] # Perfect! We are ready to create the RNN to identify good strings. We build a simple sequence binary classifier: # In[110]: np.random.seed(42) tf.random.set_seed(42) embedding_size = 5 model = tf.keras.Sequential([ tf.keras.layers.InputLayer(input_shape=[None], dtype=tf.int32, ragged=True), tf.keras.layers.Embedding(input_dim=len(POSSIBLE_CHARS), output_dim=embedding_size), tf.keras.layers.GRU(30), tf.keras.layers.Dense(1, activation="sigmoid") ]) optimizer = tf.keras.optimizers.SGD(learning_rate=0.02, momentum = 0.95, nesterov=True) model.compile(loss="binary_crossentropy", optimizer=optimizer, metrics=["accuracy"]) history = model.fit(X_train, y_train, epochs=20, validation_data=(X_valid, y_valid)) # Now let's test our RNN on two tricky strings: the first one is bad while the second one is good. They only differ by the second to last character. If the RNN gets this right, it shows that it managed to notice the pattern that the second letter should always be equal to the second to last letter. That requires a fairly long short-term memory (which is the reason why we used a GRU cell). # In[111]: test_strings = ["BPBTSSSSSSSXXTTVPXVPXTTTTTVVETE", "BPBTSSSSSSSXXTTVPXVPXTTTTTVVEPE"] X_test = tf.ragged.constant([string_to_ids(s) for s in test_strings], ragged_rank=1) y_proba = model.predict(X_test) print() print("Estimated probability that these are Reber strings:") for index, string in enumerate(test_strings): print("{}: {:.2f}%".format(string, 100 * y_proba[index][0])) # Ta-da! It worked fine. The RNN found the correct answers with very high confidence. :) # ## 9. # _Exercise: Train an Encoder–Decoder model that can convert a date string from one format to another (e.g., from "April 22, 2019" to "2019-04-22")._ # Let's start by creating the dataset. We will use random days between 1000-01-01 and 9999-12-31: # In[112]: from datetime import date # cannot use strftime()'s %B format since it depends on the locale MONTHS = ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] def random_dates(n_dates): min_date = date(1000, 1, 1).toordinal() max_date = date(9999, 12, 31).toordinal() ordinals = np.random.randint(max_date - min_date, size=n_dates) + min_date dates = [date.fromordinal(ordinal) for ordinal in ordinals] x = [MONTHS[dt.month - 1] + " " + dt.strftime("%d, %Y") for dt in dates] y = [dt.isoformat() for dt in dates] return x, y # Here are a few random dates, displayed in both the input format and the target format: # In[113]: np.random.seed(42) n_dates = 3 x_example, y_example = random_dates(n_dates) print("{:25s}{:25s}".format("Input", "Target")) print("-" * 50) for idx in range(n_dates): print("{:25s}{:25s}".format(x_example[idx], y_example[idx])) # Let's get the list of all possible characters in the inputs: # In[114]: INPUT_CHARS = "".join(sorted(set("".join(MONTHS) + "0123456789, "))) INPUT_CHARS # And here's the list of possible characters in the outputs: # In[115]: OUTPUT_CHARS = "0123456789-" # Let's write a function to convert a string to a list of character IDs, as we did in the previous exercise: # In[116]: def date_str_to_ids(date_str, chars=INPUT_CHARS): return [chars.index(c) for c in date_str] # In[117]: date_str_to_ids(x_example[0], INPUT_CHARS) # In[118]: date_str_to_ids(y_example[0], OUTPUT_CHARS) # In[119]: def prepare_date_strs(date_strs, chars=INPUT_CHARS): X_ids = [date_str_to_ids(dt, chars) for dt in date_strs] X = tf.ragged.constant(X_ids, ragged_rank=1) return (X + 1).to_tensor() # using 0 as the padding token ID def create_dataset(n_dates): x, y = random_dates(n_dates) return prepare_date_strs(x, INPUT_CHARS), prepare_date_strs(y, OUTPUT_CHARS) # In[120]: np.random.seed(42) X_train, Y_train = create_dataset(10000) X_valid, Y_valid = create_dataset(2000) X_test, Y_test = create_dataset(2000) # In[121]: Y_train[0] # ### First version: a very basic seq2seq model # Let's first try the simplest possible model: we feed in the input sequence, which first goes through the encoder (an embedding layer followed by a single LSTM layer), which outputs a vector, then it goes through a decoder (a single LSTM layer, followed by a dense output layer), which outputs a sequence of vectors, each representing the estimated probabilities for all possible output character. # # Since the decoder expects a sequence as input, we repeat the vector (which is output by the encoder) as many times as the longest possible output sequence. # In[122]: embedding_size = 32 max_output_length = Y_train.shape[1] np.random.seed(42) tf.random.set_seed(42) encoder = tf.keras.Sequential([ tf.keras.layers.Embedding(input_dim=len(INPUT_CHARS) + 1, output_dim=embedding_size, input_shape=[None]), tf.keras.layers.LSTM(128) ]) decoder = tf.keras.Sequential([ tf.keras.layers.LSTM(128, return_sequences=True), tf.keras.layers.Dense(len(OUTPUT_CHARS) + 1, activation="softmax") ]) model = tf.keras.Sequential([ encoder, tf.keras.layers.RepeatVector(max_output_length), decoder ]) optimizer = tf.keras.optimizers.Nadam() model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) history = model.fit(X_train, Y_train, epochs=20, validation_data=(X_valid, Y_valid)) # Looks great, we reach 100% validation accuracy! Let's use the model to make some predictions. We will need to be able to convert a sequence of character IDs to a readable string: # In[123]: def ids_to_date_strs(ids, chars=OUTPUT_CHARS): return ["".join([("?" + chars)[index] for index in sequence]) for sequence in ids] # Now we can use the model to convert some dates # In[124]: X_new = prepare_date_strs(["September 17, 2009", "July 14, 1789"]) # In[125]: ids = model.predict(X_new).argmax(axis=-1) for date_str in ids_to_date_strs(ids): print(date_str) # Perfect! :) # However, since the model was only trained on input strings of length 18 (which is the length of the longest date), it does not perform well if we try to use it to make predictions on shorter sequences: # In[126]: X_new = prepare_date_strs(["May 02, 2020", "July 14, 1789"]) # In[127]: ids = model.predict(X_new).argmax(axis=-1) for date_str in ids_to_date_strs(ids): print(date_str) # Oops! We need to ensure that we always pass sequences of the same length as during training, using padding if necessary. Let's write a little helper function for that: # In[128]: max_input_length = X_train.shape[1] def prepare_date_strs_padded(date_strs): X = prepare_date_strs(date_strs) if X.shape[1] < max_input_length: X = tf.pad(X, [[0, 0], [0, max_input_length - X.shape[1]]]) return X def convert_date_strs(date_strs): X = prepare_date_strs_padded(date_strs) ids = model.predict(X).argmax(axis=-1) return ids_to_date_strs(ids) # In[129]: convert_date_strs(["May 02, 2020", "July 14, 1789"]) # Cool! Granted, there are certainly much easier ways to write a date conversion tool (e.g., using regular expressions or even basic string manipulation), but you have to admit that using neural networks is way cooler. ;-) # However, real-life sequence-to-sequence problems will usually be harder, so for the sake of completeness, let's build a more powerful model. # ### Second version: feeding the shifted targets to the decoder (teacher forcing) # Instead of feeding the decoder a simple repetition of the encoder's output vector, we can feed it the target sequence, shifted by one time step to the right. This way, at each time step the decoder will know what the previous target character was. This should help is tackle more complex sequence-to-sequence problems. # # Since the first output character of each target sequence has no previous character, we will need a new token to represent the start-of-sequence (sos). # # During inference, we won't know the target, so what will we feed the decoder? We can just predict one character at a time, starting with an sos token, then feeding the decoder all the characters that were predicted so far (we will look at this in more details later in this notebook). # # But if the decoder's LSTM expects to get the previous target as input at each step, how shall we pass it it the vector output by the encoder? Well, one option is to ignore the output vector, and instead use the encoder's LSTM state as the initial state of the decoder's LSTM (which requires that encoder's LSTM must have the same number of units as the decoder's LSTM). # # Now let's create the decoder's inputs (for training, validation and testing). The sos token will be represented using the last possible output character's ID + 1. # In[130]: sos_id = len(OUTPUT_CHARS) + 1 def shifted_output_sequences(Y): sos_tokens = tf.fill(dims=(len(Y), 1), value=sos_id) return tf.concat([sos_tokens, Y[:, :-1]], axis=1) X_train_decoder = shifted_output_sequences(Y_train) X_valid_decoder = shifted_output_sequences(Y_valid) X_test_decoder = shifted_output_sequences(Y_test) # Let's take a look at the decoder's training inputs: # In[131]: X_train_decoder # Now let's build the model. It's not a simple sequential model anymore, so let's use the functional API: # In[132]: encoder_embedding_size = 32 decoder_embedding_size = 32 lstm_units = 128 np.random.seed(42) tf.random.set_seed(42) encoder_input = tf.keras.layers.Input(shape=[None], dtype=tf.int32) encoder_embedding = tf.keras.layers.Embedding( input_dim=len(INPUT_CHARS) + 1, output_dim=encoder_embedding_size)(encoder_input) _, encoder_state_h, encoder_state_c = tf.keras.layers.LSTM( lstm_units, return_state=True)(encoder_embedding) encoder_state = [encoder_state_h, encoder_state_c] decoder_input = tf.keras.layers.Input(shape=[None], dtype=tf.int32) decoder_embedding = tf.keras.layers.Embedding( input_dim=len(OUTPUT_CHARS) + 2, output_dim=decoder_embedding_size)(decoder_input) decoder_lstm_output = tf.keras.layers.LSTM(lstm_units, return_sequences=True)( decoder_embedding, initial_state=encoder_state) decoder_output = tf.keras.layers.Dense(len(OUTPUT_CHARS) + 1, activation="softmax")(decoder_lstm_output) model = tf.keras.Model(inputs=[encoder_input, decoder_input], outputs=[decoder_output]) optimizer = tf.keras.optimizers.Nadam() model.compile(loss="sparse_categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"]) history = model.fit([X_train, X_train_decoder], Y_train, epochs=10, validation_data=([X_valid, X_valid_decoder], Y_valid)) # This model also reaches 100% validation accuracy, but it does so even faster. # Let's once again use the model to make some predictions. This time we need to predict characters one by one. # In[133]: sos_id = len(OUTPUT_CHARS) + 1 def predict_date_strs(date_strs): X = prepare_date_strs_padded(date_strs) Y_pred = tf.fill(dims=(len(X), 1), value=sos_id) for index in range(max_output_length): pad_size = max_output_length - Y_pred.shape[1] X_decoder = tf.pad(Y_pred, [[0, 0], [0, pad_size]]) Y_probas_next = model.predict([X, X_decoder])[:, index:index+1] Y_pred_next = tf.argmax(Y_probas_next, axis=-1, output_type=tf.int32) Y_pred = tf.concat([Y_pred, Y_pred_next], axis=1) return ids_to_date_strs(Y_pred[:, 1:]) # In[134]: predict_date_strs(["July 14, 1789", "May 01, 2020"]) # Works fine! Next, feel free to write a Transformer version. :) # ## 10. # _Exercise: Go through Keras's tutorial for [Natural language image search with a Dual Encoder](https://homl.info/dualtuto). You will learn how to build a model capable of representing both images and text within the same embedding space. This makes it possible to search for images using a text prompt, like in the [CLIP model](https://openai.com/blog/clip/) by OpenAI._ # Just click the link and follow the instructions. # ## 11. # _Exercise: Use the Transformers library to download a pretrained language model capable of generating text (e.g., GPT), and try generating more convincing Shakespearean text. You will need to use the model's `generate()` method—see Hugging Face's documentation for more details._ # First, let's load a pretrained model. In this example, we will use OpenAI's GPT model, with an additional Language Model on top (just a linear layer with weights tied to the input embeddings). Let's import it and load the pretrained weights (this will download about 445MB of data to `~/.cache/torch/transformers`): # In[135]: from transformers import TFOpenAIGPTLMHeadModel model = TFOpenAIGPTLMHeadModel.from_pretrained("openai-gpt") # Next we will need a specialized tokenizer for this model. This one will try to use the [spaCy](https://spacy.io/) and [ftfy](https://pypi.org/project/ftfy/) libraries if they are installed, or else it will fall back to BERT's `BasicTokenizer` followed by Byte-Pair Encoding (which should be fine for most use cases). # In[136]: from transformers import OpenAIGPTTokenizer tokenizer = OpenAIGPTTokenizer.from_pretrained("openai-gpt") # Now let's use the tokenizer to tokenize and encode the prompt text: # In[137]: tokenizer("hello everyone") # In[138]: prompt_text = "This royal throne of kings, this sceptred isle" encoded_prompt = tokenizer.encode(prompt_text, add_special_tokens=False, return_tensors="tf") encoded_prompt # Easy! Next, let's use the model to generate text after the prompt. We will generate 5 different sentences, each starting with the prompt text, followed by 40 additional tokens. For an explanation of what all the hyperparameters do, make sure to check out this great [blog post](https://huggingface.co/blog/how-to-generate) by Patrick von Platen (from Hugging Face). You can play around with the hyperparameters to try to obtain better results. # In[139]: num_sequences = 5 length = 40 generated_sequences = model.generate( input_ids=encoded_prompt, do_sample=True, max_length=length + len(encoded_prompt[0]), temperature=1.0, top_k=0, top_p=0.9, repetition_penalty=1.0, num_return_sequences=num_sequences, ) generated_sequences # Now let's decode the generated sequences and print them: # In[140]: for sequence in generated_sequences: text = tokenizer.decode(sequence, clean_up_tokenization_spaces=True) print(text) print("-" * 80) # You can try more recent (and larger) models, such as GPT-2, CTRL, Transformer-XL or XLNet, which are all available as pretrained models in the transformers library, including variants with Language Models on top. The preprocessing steps vary slightly between models, so make sure to check out this [generation example](https://github.com/huggingface/transformers/blob/master/examples/run_generation.py) from the transformers documentation (this example uses PyTorch, but it will work with very little tweaks, such as adding `TF` at the beginning of the model class name, removing the `.to()` method calls, and using `return_tensors="tf"` instead of `"pt"`. # Hope you enjoyed this chapter! :)