#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
![]() |
![]() |
![]() |
![]() |
This notebook gives a brief introduction into the *Sequence to Sequence Model Architecture* In this noteboook we broadly cover four essential topics necessary for Neural Machine Translation:
The basic idea behind such a model though, is only the encoder-decoder architechture. These networks are usually used for a variety of tasks like text-summerization, Machine translation, Image Captioning, etc. This tutorial provideas a hands-on understanding of the concept, explaining the technical jargons wherever necessary. We focus on the task of Neural Machine Translation (NMT) which was the very first testbed for seq2seq models.
from __future__ import absolute_import, division, print_function, unicode_literals
try:
%tensorflow_version 2.x
except:
pass
!pip install -q --no-deps tensorflow-addons~=0.6
!pip install nltk
import tensorflow as tf
import tensorflow_addons as tfa
These are a list of resurces you must install in order to allow you to run this notebook:
Encoder Embeddings (if necessary - reduce training time)
Decoder Embeddings (if necessary - reduce training time)
The dataset should be downloaded, in order to compile this notebook, the embeddings can be used, as they are pretrained. Though, we carry out our own training here !!
#download data
print("Downloading Dataset:")
!wget --quiet http://www.manythings.org/anki/deu-eng.zip
!unzip deu-eng.zip
import csv
import string
import re
from pickle import dump
from unicodedata import normalize
from numpy import array
from pickle import load
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import RepeatVector
from tensorflow.keras.layers import TimeDistributed
from tensorflow.keras.callbacks import ModelCheckpoint
from pickle import load
from numpy import array
from numpy import argmax
from nltk.translate.bleu_score import corpus_bleu
Our data set is a German-English translation dataset. It contains 152,820 pairs of English to German phases, one pair per line with a tab separating the language. These dataset though organized needs cleaning before we can work on it. This will enable us to remove unnecessary bumps that may come in during the training.
# load doc into memory
def load_documnet(filename):
# open the file as read only
file = open(filename, mode='rt', encoding='utf-8')
# read all text
text = file.read()
# close the file
file.close()
return text
# split a loaded document into sentences
def doc_sep_pair(doc):
lines = doc.strip().split('\n')
pairs = [line.split('\t') for line in lines]
return pairs
# clean a list of lines
def clean_sentences(lines):
cleaned = list()
re_print = re.compile('[^%s]' % re.escape(string.printable))
# prepare translation table
table = str.maketrans('', '', string.punctuation)
for pair in lines:
clean_pair = list()
for line in pair:
# normalizing unicode characters
line = normalize('NFD', line).encode('ascii', 'ignore')
line = line.decode('UTF-8')
# tokenize on white space
line = line.split()
# convert to lowercase
line = [word.lower() for word in line]
# removing punctuation
line = [word.translate(table) for word in line]
# removing non-printable chars form each token
line = [re_print.sub('', w) for w in line]
# removing tokens with numbers
line = [word for word in line if word.isalpha()]
# store as string
clean_pair.append(' '.join(line))
cleaned.append(clean_pair)
return array(cleaned)
# load dataset
filename = 'deu.txt' #change filename if necessary
doc = load_documnet(filename)
#clean sentences and save clean data
pairs = doc_sep_pair(doc)
clean_sentences = clean_sentences(pairs)
# uncomment to check mapping
#for i in range(100):
# print('[%s] => [%s]' % (clean_sentences[i,0], clean_sentences[i,1]))
from pickle import load
from pickle import dump
from numpy.random import rand
from numpy.random import shuffle
# load a clean dataset
def load_clean_sentences(filename):
return load(open(filename, 'rb'))
# load dataset
raw_data = clean_sentences
# reduce dataset size
n_sentences = 10000
data = raw_data[:n_sentences, :2] #extract only english and german sentences
shuffle(data)
# split into train/test
train, test = data[:9000], data[9000:]
def tokenization(lines):
tokenizer = Tokenizer()
tokenizer.fit_on_texts(lines)
return tokenizer
def max_length(lines):
return max(len(line.split()) for line in lines)
def encode_sequences(tokenizer, length, lines):
# integer encode sequences
X = tokenizer.texts_to_sequences(lines)
# pad sequences with 0 values
X = pad_sequences(X, maxlen=length, padding='post')
return X
# one hot encode target sequence
def encode_output(sequences, vocab_size):
ylist = list()
for sequence in sequences:
encoded = to_categorical(sequence, num_classes=vocab_size)
ylist.append(encoded)
y = array(ylist)
y = y.reshape(sequences.shape[0], sequences.shape[1], vocab_size)
return y
# define NMT model
def define_model(src_vocab, tar_vocab, src_timesteps, tar_timesteps, n_units):
model = Sequential()
model.add(Embedding(src_vocab, n_units, input_length=src_timesteps, mask_zero=True))
model.add(LSTM(n_units))
model.add(RepeatVector(tar_timesteps))
model.add(LSTM(n_units, return_sequences=True))
model.add(TimeDistributed(Dense(tar_vocab, activation='softmax')))
return model
# prepare english tokenizer
eng_tokenizer = tokenization(data[:, 0])
eng_vocab_size = len(eng_tokenizer.word_index) + 1
eng_length = max_length(data[:, 0])
print('English Vocabulary Size: %d' % eng_vocab_size)
print('English Max Length: %d' % (eng_length))
# prepare german tokenizer
ger_tokenizer = tokenization(data[:, 1])
ger_vocab_size = len(ger_tokenizer.word_index) + 1
ger_length = max_length(data[:, 1])
print('German Vocabulary Size: %d' % ger_vocab_size)
print('German Max Length: %d' % (ger_length))
# prepare training data
trainX = encode_sequences(ger_tokenizer, ger_length, train[:, 1])
trainY = encode_sequences(eng_tokenizer, eng_length, train[:, 0])
trainY = encode_output(trainY, eng_vocab_size)
# prepare validation data
testX = encode_sequences(ger_tokenizer, ger_length, test[:, 1])
testY = encode_sequences(eng_tokenizer, eng_length, test[:, 0])
testY = encode_output(testY, eng_vocab_size)
# compile the model (you can use lazy adam is necessary)
# model.compile(optimizer=tfa.optimizers.LazyAdam(0.001), loss=tf.keras.losses.SparseCategoricalCrossentropy(), metrics=['accuracy'])
model = define_model(ger_vocab_size, eng_vocab_size, ger_length, eng_length, 256)
model.compile(optimizer='adam', loss='categorical_crossentropy')
#checkpoint the model
checkpoint = ModelCheckpoint(filename, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
#training - tune hyperparameters if necessary
model.fit(trainX, trainY, epochs=100, batch_size=64, validation_data=(testX, testY))
# map an integer to a word
def int_to_word(integer, tokenizer):
for word, index in tokenizer.word_index.items():
if index == integer:
return word
return None
# generate target given source sequence
def predict_sequence(model, tokenizer, source):
prediction = model.predict(source, verbose=0)[0]
integers = [argmax(vector) for vector in prediction]
target = list()
for i in integers:
word = int_to_word(i, tokenizer)
if word is None:
break
target.append(word)
return ' '.join(target)
# evaluating the model
def evaluate(model, tokenizer, sources, raw_dataset):
actual, predicted = list(), list()
for i, source in enumerate(sources):
# translate encoded source text
source = source.reshape((1, source.shape[0]))
translation = predict_sequence(model, eng_tokenizer, source)
raw_target, raw_src = raw_dataset[i]
if i < 10:
print('src=[%s], target=[%s], predicted=[%s]' % (raw_src, raw_target, translation))
actual.append([raw_target.split()])
predicted.append(translation.split())
# calculate BLEU score
print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))
# prepare english tokenizer
eng_tokenizer = tokenization(data[:, 0])
eng_vocab_size = len(eng_tokenizer.word_index) + 1
eng_length = max_length(data[:, 0])
# prepare german tokenizer
ger_tokenizer = tokenization(data[:, 1])
ger_vocab_size = len(ger_tokenizer.word_index) + 1
ger_length = max_length(data[:, 1])
# prepare data
trainX = encode_sequences(ger_tokenizer, ger_length, train[:, 1])
testX = encode_sequences(ger_tokenizer, ger_length, test[:, 1])
# test on some training sequences
print('train')
evaluate(model, eng_tokenizer, trainX, train)
# test on some test sequences
print('test')
evaluate(model, eng_tokenizer, testX, test)