"""
We use following lines because we are running on Google Colab
If you are running notebook on a local computer, you don't need this cell
"""
from google.colab import drive
drive.mount('/content/gdrive')
import os
os.chdir('/content/gdrive/My Drive/finch/tensorflow1/free_chat/chinese_gaoq1/main')
%tensorflow_version 1.x
!pip install texar
import tensorflow as tf
import texar.tf as tx
import numpy as np
import copy
from texar.tf.modules import TransformerEncoder, TransformerDecoder
print("TensorFlow Version", tf.__version__)
print('GPU Enabled:', tf.test.is_gpu_available())
def forward(features, labels, mode):
if isinstance(features, dict):
words = features['words']
else:
words = features
words_len = tf.count_nonzero(words, 1, dtype=tf.int32)
is_training = (mode == tf.estimator.ModeKeys.TRAIN)
batch_sz = tf.shape(words)[0]
with tf.variable_scope('Embedding'):
embedding = tf.Variable(np.load('../vocab/char.npy'),
dtype=tf.float32,
name='fasttext_vectors')
embedding = tf.concat([tf.zeros(shape=[1, params['embed_dim']]), embedding[1:, :]], axis=0)
x = tf.nn.embedding_lookup(embedding, words)
pos_embedder = tx.modules.SinusoidsPositionEmbedder(
position_size = 2*params['max_len'],
hparams = config_model.position_embedder_hparams)
x = (x * config_model.hidden_dim ** 0.5) + pos_embedder(sequence_length=words_len)
with tf.variable_scope('Encoder'):
encoder = TransformerEncoder(hparams=config_model.encoder)
enc_out = encoder(inputs=x, sequence_length=words_len)
with tf.variable_scope('Decoder'):
decoder = TransformerDecoder(vocab_size=len(params['char2idx'])+1,
output_layer=tf.transpose(embedding, (1, 0)),
hparams=config_model.decoder)
start_tokens = tf.fill([batch_sz], 1)
def _embedding_fn(x, y):
x_w_embed = tf.nn.embedding_lookup(embedding, x)
y_p_embed = pos_embedder(y)
return x_w_embed * config_model.hidden_dim ** 0.5 + y_p_embed
predictions = decoder(
memory=enc_out,
memory_sequence_length=words_len,
beam_width=params['beam_width'],
length_penalty=params['length_penalty'],
start_tokens=start_tokens,
end_token=2,
embedding=_embedding_fn,
max_decoding_length=params['max_len'],
mode=tf.estimator.ModeKeys.PREDICT)
return predictions['sample_id'][:, :, :params['top_k']]
def model_fn(features, labels, mode, params):
logits_or_ids = forward(features, labels, mode)
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(mode, predictions=logits_or_ids)
class config_model:
hidden_dim = 300
num_heads = 8
dropout_rate = .2
num_blocks = 6
position_embedder_hparams = {
'dim': hidden_dim
}
encoder = {
'dim': hidden_dim,
'embedding_dropout': dropout_rate,
'residual_dropout': dropout_rate,
'num_blocks': num_blocks,
'initializer': {
'type': 'variance_scaling_initializer',
'kwargs': {
'scale': 1.0,
'mode': 'fan_avg',
'distribution': 'uniform',
},
},
'multihead_attention': {
'dropout_rate': dropout_rate,
'num_heads': num_heads,
'output_dim': hidden_dim,
'use_bias': True,
},
'poswise_feedforward': {
'name': 'fnn',
'layers': [
{
'type': 'Dense',
'kwargs': {
'name': 'conv1',
'units': hidden_dim * 4,
'activation': 'relu',
'use_bias': True,
},
},
{
'type': 'Dropout',
'kwargs': {
'rate': dropout_rate,
}
},
{
'type': 'Dense',
'kwargs': {
'name': 'conv2',
'units': hidden_dim,
'use_bias': True,
}
}
],
},
}
decoder = copy.deepcopy(encoder)
decoder['output_layer_bias'] = True
params = {
'model_dir': '../model/transformer',
'export_dir': '../model/transformer_export',
'vocab_path': '../vocab/char.txt',
'max_len': 10,
'embed_dim': config_model.hidden_dim,
'beam_width': 5,
'top_k': 3,
'length_penalty': .6,
}
def serving_input_receiver_fn():
words = tf.placeholder(tf.int32, [None, None], 'words')
features = {'words': words}
receiver_tensors = features
return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)
def get_vocab(f_path):
word2idx = {}
with open(f_path) as f:
for i, line in enumerate(f):
line = line.rstrip('\n')
word2idx[line] = i
return word2idx
params['char2idx'] = get_vocab(params['vocab_path'])
params['idx2char'] = {idx: char for char, idx in params['char2idx'].items()}
estimator = tf.estimator.Estimator(model_fn, params['model_dir'])
estimator.export_saved_model(params['export_dir'], serving_input_receiver_fn)