from google.colab import drive
drive.mount('/content/gdrive')
import os
os.chdir('/content/gdrive/My Drive/finch/tensorflow2/text_classification/clue/main')
!pip install transformers
from transformers import BertTokenizer, TFBertModel
from sklearn.metrics import classification_report
import os
import json
import time
import logging
import pprint
import numpy as np
import tensorflow as tf
import tensorflow_addons as tfa
print("TensorFlow Version", tf.__version__)
print('GPU Enabled:', tf.test.is_gpu_available())
def get_vocab(f_path):
word2idx = {}
with open(f_path) as f:
for i, line in enumerate(f):
line = line.rstrip()
word2idx[line] = i
return word2idx
params = {
'pretrain_path': 'bert-base-chinese',
'train_path': '../data/train.txt',
'test_path': '../data/test.txt',
'batch_size': 16,
'buffer_size': 31728,
'init_lr': 1e-5,
'max_lr': 4e-5,
'label_smooth': .2,
'n_epochs': 12,
'num_patience': 5,
}
params['label2idx'] = get_vocab('../vocab/label.txt')
tokenizer = BertTokenizer.from_pretrained(params['pretrain_path'],
lowercase = True,
add_special_tokens = True)
# stream data from text files
def data_generator(f_path, params):
with open(f_path) as f:
print('Reading', f_path)
for line in f:
line = json.loads(line.rstrip())
text, label = line['content'], line['label']
text = list(text)
text = ['[CLS]'] + text + ['[SEP]']
text = tokenizer.convert_tokens_to_ids(text)
seg = [0] * len(text)
label = params['label2idx'][label]
yield (text, seg), int(label)
def dataset(is_training, params):
_shapes = (([None], [None]), ())
_types = ((tf.int32, tf.int32), tf.int32)
_pads = ((0, 0), -1)
if is_training:
ds = tf.data.Dataset.from_generator(
lambda: data_generator(params['train_path'], params),
output_shapes = _shapes,
output_types = _types,)
ds = ds.shuffle(params['buffer_size'])
ds = ds.padded_batch(params['batch_size'], _shapes, _pads)
ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
else:
ds = tf.data.Dataset.from_generator(
lambda: data_generator(params['test_path'], params),
output_shapes = _shapes,
output_types = _types,)
ds = ds.padded_batch(params['batch_size'], _shapes, _pads)
ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
return ds
# input stream ids check
(text, seg), _ = next(data_generator(params['train_path'], params))
print(text)
print(seg)
class BertFinetune(tf.keras.Model):
def __init__(self, params):
super(BertFinetune, self).__init__()
self.bert = TFBertModel.from_pretrained(params['pretrain_path'],
trainable = True)
self.drop_1 = tf.keras.layers.Dropout(.1)
self.fc = tf.keras.layers.Dense(300, tf.nn.swish, name='down_stream/fc')
self.drop_2 = tf.keras.layers.Dropout(.1)
self.out = tf.keras.layers.Dense(len(params['label2idx']), name='down_stream/out')
def call(self, bert_inputs, training):
bert_inputs = [tf.cast(inp, tf.int32) for inp in bert_inputs]
x = self.bert(bert_inputs, training=training)
x = x[1]
x = self.drop_1(x, training=training)
x = self.fc(x)
x = self.drop_2(x, training=training)
x = self.out(x)
return x
model = BertFinetune(params)
model.build([[None, None], [None, None], [None, None]])
step_size = 2 * params['buffer_size'] // params['batch_size']
decay_lr = tfa.optimizers.Triangular2CyclicalLearningRate(
initial_learning_rate = params['init_lr'],
maximal_learning_rate = params['max_lr'],
step_size = step_size,)
optim = tf.optimizers.Adam(params['init_lr'])
global_step = 0
best_acc = .0
count = 0
t0 = time.time()
logger = logging.getLogger('tensorflow')
logger.setLevel(logging.INFO)
for _ in range(params['n_epochs']):
# TRAINING
for ((text, seg), labels) in dataset(is_training=True, params=params):
with tf.GradientTape() as tape:
logits = model([text, tf.sign(text), seg], training=True)
loss = tf.compat.v1.losses.softmax_cross_entropy(
tf.one_hot(labels, len(params['label2idx']), dtype=tf.float32),
logits = logits,
label_smoothing = params['label_smooth'],)
optim.lr.assign(decay_lr(global_step))
grads = tape.gradient(loss, model.trainable_variables)
grads, _ = tf.clip_by_global_norm(grads, 5.)
optim.apply_gradients(zip(grads, model.trainable_variables))
if global_step % 100 == 0:
logger.info("Step {} | Loss: {:.4f} | Spent: {:.1f} secs | LR: {:.6f}".format(
global_step, loss.numpy().item(), time.time()-t0, optim.lr.numpy().item()))
t0 = time.time()
global_step += 1
# EVALUATION
m = tf.keras.metrics.Accuracy()
intent_true = []
intent_pred = []
for ((text, seg), labels) in dataset(is_training=False, params=params):
logits = model([text, tf.sign(text), seg], training=False)
y_intent = tf.argmax(logits, -1)
m.update_state(y_true=labels, y_pred=y_intent)
intent_true += labels.numpy().flatten().tolist()
intent_pred += y_intent.numpy().flatten().tolist()
acc = m.result().numpy()
logger.info("Evaluation: Testing Accuracy: {:.3f}".format(acc))
logger.info('\n'+classification_report(y_true = intent_true,
y_pred = intent_pred,
labels = list(params['label2idx'].values()),
target_names = list(params['label2idx'].keys()),
digits=3))
if acc > best_acc:
best_acc = acc
model.save_weights('../model/bert_finetune')
count = 0
else:
count += 1
logger.info("Best Accuracy: {:.3f}".format(best_acc))
if count == params['num_patience']:
print(params['num_patience'], "times not improve the best result, therefore stop training")
break