from google.colab import drive
drive.mount('/content/gdrive')
import os
os.chdir('/content/gdrive/My Drive/finch/tensorflow2/text_classification/imdb/main')
!pip install transformers
from transformers import RobertaTokenizer, TFRobertaModel
from sklearn.linear_model import LinearRegression, LogisticRegression
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
import tensorflow as tf
import numpy as np
print("TensorFlow Version", tf.__version__)
print('GPU Enabled:', tf.test.is_gpu_available())
params = {
'train_paths': [
'../data/train_bt_part1.txt',
'../data/train_bt_part2.txt',
'../data/train_bt_part3.txt',
'../data/train_bt_part4.txt',
'../data/train_bt_part5.txt',
'../data/train_bt_part6.txt',
],
'test_paths': [
'../data/test.txt',
],
'pretrain_path': 'roberta-base',
'batch_size': 32,
'max_len': 300,
}
tokenizer = RobertaTokenizer.from_pretrained(params['pretrain_path'],
lowercase = True,
add_special_tokens = True)
def bert_data_generator(f_paths, params):
for f_path in f_paths:
with open(f_path) as f:
print('Reading', f_path)
for line in f:
line = line.rstrip()
label, text = line.split('\t')
text = ['<s>'] + tokenizer.tokenize(text) + ['</s>']
if len(text) > params['max_len']:
_max_len = params['max_len'] // 2
text = text[:_max_len] + text[-_max_len:]
seg = [0] * len(text)
text = tokenizer.convert_tokens_to_ids(text)
y = int(label)
yield text, y
def bert_dataset(params):
_shapes = ([None], ())
_types = (tf.int32, tf.int32)
_pads = (1, -1)
ds = tf.data.Dataset.from_generator(
lambda: bert_data_generator(params['train_paths'], params),
output_shapes = _shapes,
output_types = _types,)
ds = ds.padded_batch(params['batch_size'], _shapes, _pads)
ds = ds.prefetch(tf.data.experimental.AUTOTUNE)
return ds
def lr_data_generator(f_paths):
for f_path in f_paths:
with open(f_path) as f:
print('Reading', f_path)
for line in f:
line = line.rstrip()
label, text = line.split('\t')
y = int(label)
yield text, y
def get_data(f_paths):
x, y = [], []
for text, label in lr_data_generator(f_paths):
x.append(text)
y.append(label)
return x, y
# input stream ids check
text, _ = next(bert_data_generator(params['train_paths'], params))
print(text)
class RobertaFinetune(tf.keras.Model):
def __init__(self, params):
super(RobertaFinetune, self).__init__()
self.bert = TFRobertaModel.from_pretrained(params['pretrain_path'],
trainable = True)
self.drop_1 = tf.keras.layers.Dropout(.1)
self.fc = tf.keras.layers.Dense(300, tf.nn.swish, name='down_stream/fc')
self.drop_2 = tf.keras.layers.Dropout(.1)
self.out = tf.keras.layers.Dense(2, name='down_stream/out')
def call(self, bert_inputs, training):
bert_inputs = [tf.cast(inp, tf.int32) for inp in bert_inputs]
x = self.bert(bert_inputs, training=training)[1]
x = self.drop_1(x, training=training)
x = self.fc(x)
x = self.drop_2(x, training=training)
x = self.out(x)
return x
x_train, y_train = get_data(params['train_paths'])
x_test, y_test = get_data(params['test_paths'])
count_model = CountVectorizer(binary=True, ngram_range=(1,2))
count_model.fit(x_train)
tfidf_model = TfidfTransformer()
tfidf_model.fit(count_model.transform(x_train))
X_train_tfidf = tfidf_model.transform(count_model.transform(x_train))
X_test_tfidf = tfidf_model.transform(count_model.transform(x_test))
model = RobertaFinetune(params)
model.build([[None, None], [None, None], [None, None]])
model.load_weights('../model/roberta_finetune')
teacher_preds = []
for (text, labels) in bert_dataset(params=params):
masks = tf.cast(tf.math.not_equal(text, 1), tf.int32)
logits = model([text, masks], training=False)
teacher_preds.append(tf.nn.softmax(logits))
teacher_preds = tf.concat(teacher_preds, axis=0)
linear_reg = LinearRegression()
logistic_reg = LogisticRegression(solver='lbfgs')
y_proba = logistic_reg.fit(X_train_tfidf, y_train).predict_proba(X_test_tfidf)
student_preds = linear_reg.fit(X_train_tfidf, teacher_preds.numpy()).predict(X_test_tfidf)
y_pred = np.argmax((student_preds + y_proba) / 2, axis=1)
final_acc = (y_pred == y_test).mean()
print("final testing accuracy: {:.3f}".format(final_acc))