from typing import List, Tuple
import pnlp
from pnlp import Text, num_norm, cut_zhchar, MagicDict
from dataclasses import dataclass, field
from collections import Counter
from itertools import chain
import numpy as np
import pandas as pd
import ahocorasick
from Levenshtein import jaro
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from LAC import LAC
import paddle
import paddle.nn as nn
import paddle.optimizer as optim
from paddlenlp.data import Pad
import paddlenlp
from paddlenlp.transformers import SkepForSequenceClassification
from paddlenlp import Taskflow
ROOT = Path.cwd()
@dataclass
class Dataset:
file_path: Path
test_size: float = 0.2
def __post_init__(self):
self.df = pd.read_csv(self.file_path, sep="\t")
self.train, self.test = self.split()
def split(self):
return train_test_split(self.df, test_size=self.test_size, random_state=42)
@dataclass
class PreProcessor:
rules: List[str] = field(
default_factory=lambda: ['pic', 'lnk'])
def __post_init__(self):
self.clean_rule = Text(self.rules)
def clean(self, text: str) -> str:
return self.clean_rule.clean(text)
def normalize(self, text: str) -> str:
return text
def __call__(self, text: str) -> str:
return self.normalize(self.clean(text))
@dataclass
class Tokenzier:
type: str = "word"
vocab_path: Path = ROOT / "vocab.txt"
max_len: int = 128
def __post_init__(self):
self.word_segmentor = LAC(mode="seg")
self.vocab = []
self.word2id = {}
if self.vocab_path.exists():
self.load_vocab(self.vocab_path)
def tokenize2word(self, text: str) -> List[str]:
return self.word_segmentor.run(text)
def tokenize2char(self, text: str) -> List[str]:
return cut_zhchar(text)
def tokenize(self, text: str) -> List[str]:
return getattr(self, "tokenize2" + self.type)(text)
def token2id(self, tokens: List[str]) -> List[int]:
res = []
for token in tokens:
id = self.word2id.get(token, 1)
res.append(id)
return res
def load_vocab(self, path: Path):
self.vocab = pnlp.read_lines(path)
self.word2id = dict(zip(self.vocab, range(len(self.vocab))))
def build_vocab(self, sents: List[str]):
count = Counter()
for sent in sents:
words = self.tokenize(sent)
count.update(words)
sort = sorted(count.items(), key=lambda x: x[1], reverse=True)
vocab = [w for w,f in sort if f >= 5]
self.vocab = ["<PAD>", "<UNK>"] + vocab
self.word2id = {word[0]: i for i, word in enumerate(self.vocab)}
pnlp.write_file(self.vocab_path, vocab)
def __call__(self, texts: str) -> List[int]:
if type(texts) == str:
texts = [texts]
res = []
for text in texts:
tokens = self.tokenize(text)
ids = self.token2id(tokens)
ids = ids[:self.max_len]
res.append(ids)
return res
@dataclass
class DataLoader:
file_path: Path
pretrain: str = ""
test_size: float = 0.2
rules = ['pic', 'lnk']
token_type: str = "word"
vocab_path: Path = ROOT / "vocab.txt"
def __post_init__(self):
self.ds = Dataset(self.file_path, self.test_size)
self.pp = PreProcessor(self.rules)
if self.pretrain:
self.tk = paddlenlp.transformers.ErnieTokenizer.from_pretrained(self.pretrain)
else:
self.tk = Tokenzier(self.token_type, self.vocab_path)
if not self.vocab_path.exists():
self.tk.build_vocab(self.ds.train["text_a"])
else:
self.tk.load_vocab(self.vocab_path)
def padding(self, ids: List[List[int]]):
return Pad(pad_val=0)(ids)
def token_label(self, type: str = "train"):
data = getattr(self.ds , type)
for item in self.ds.train.itertuples(index=False):
tokens = self.tk.tokenize(self.pp(item.text_a))
yield tokens, item.label
def ids_label(self, type: str = "train", batch_size: int = 64):
data = getattr(self.ds , type)
i = 0
batch, labels = [], []
for item in self.ds.train.itertuples(index=False):
if self.pretrain:
ids = self.tk(self.pp(item.text_a))["input_ids"]
else:
ids = self.tk(self.pp(item.text_a))[0]
batch.append(ids)
labels.append(item.label)
i += 1
if i == batch_size:
yield self.padding(batch), np.array(labels)
batch, labels = [], []
i = 0
if batch:
yield self.padding(batch), np.array(labels)
dl = DataLoader(ROOT / "NLPCC14-SC/train.tsv", pretrain="ernie-1.0")
[2021-10-23 02:38:08,053] [ INFO] - Already cached /Users/Yam/.paddlenlp/models/ernie-1.0/vocab.txt
@dataclass
class Model:
def evaluate(self, data: List[Tuple[List[str], int]]) -> float:
error = 0
i = 0
res = []
for tokens, label in data:
pred = self.predict(tokens)
error += (pred != label)
i += 1
res.append(pred)
return error / i
@dataclass
class DictModel(Model):
dict_path: Path = ROOT / "dict"
top_n: int = 100
def __post_init__(self):
self.pos = pnlp.read_pickle(self.dict_path / "pos.pkl")
self.neg = pnlp.read_pickle(self.dict_path / "neg.pkl")
self.pos_sample = np.random.choice(self.pos, size=self.top_n, replace=False).tolist()
self.neg_sample = np.random.choice(self.neg, size=self.top_n, replace=False).tolist()
self.model = self.build_aho(self.pos, self.neg)
self.model.make_automaton()
def build_aho(self, pos: List[str], neg: List[str]):
aho = ahocorasick.Automaton()
for idx, key in enumerate(pos):
aho.add_word(key, (1, key))
for idx, key in enumerate(neg):
aho.add_word(key, (-1, key))
return aho
def search(self, text: str) -> int:
i = 0
for end_index, (val, original_value) in self.model.iter(text):
i += val
return i
def _match(self, sample: List[str], text: str) -> float:
res = 0.0
for v in sample:
res += jaro(text, v)
return res
def predict(self, data: List[str]) -> int:
num = self.search(" ".join(data))
if num == 0:
text = "".join(data)
return int(self._match(self.pos_sample, text) > self._match(self.neg_sample, text))
else:
return int(num > 0)
dm = DictModel()
dm.evaluate(dl.token_label("test"))
0.50025
@dataclass
class NaiveBayes(Model):
def __post_init__(self):
self.pos_prob = {}
self.pos_prior = 0.5
self.neg_prob = {}
self.neg_prior = 0.5
def _train(self, data: List[str]) -> dict:
res = []
count = Counter(data)
prob = {}
length = len(data)
for k,v in count.items():
prob[k] = v / length
return prob, length
def train(self, data: List[str]):
pos, neg, labels = [], [], []
for tokens, label in data:
labels.append(label)
if label == 1:
pos.extend(tokens)
else:
neg.extend(tokens)
self.pos_prob, len_pos = self._train(pos)
self.neg_prob, len_neg = self._train(neg)
length = len_pos + len_neg
label_count = Counter(labels)
self.pos_prior = label_count[1] / length
self.neg_prior = label_count[0] / length
def predict(self, data: List[str]) -> int:
res = np.log(self.pos_prior / self.neg_prior)
for w in data:
res += np.log(self.pos_prob.get(w, 1) / self.neg_prob.get(w, 1))
return res > 0
nb = NaiveBayes()
nb.train(dl.token_label("train"))
nb.evaluate(dl.token_label("test"))
0.718
class TextCNN(paddle.nn.Layer):
def __init__(self, config):
super(TextCNN, self).__init__()
if not config.pretrained:
self.embedding = nn.Embedding(
num_embeddings=config.vocab_size,
embedding_dim=config.embed_size,
padding_idx=0,
weight_attr=config.pretrained)
else:
self.embedding = nn.Embedding(
num_embeddings=config.vocab_size,
embedding_dim=config.embed_size,
padding_idx=0)
self.convs = nn.LayerList(
[nn.Conv2D(1, config.num_filters, (kernel_size_, config.embed_size))
for kernel_size_ in config.filter_sizes])
self.dropout = nn.Dropout(config.dropout)
self.linear = nn.Linear(3 * config.num_filters, config.num_labels)
def forward(self, x):
embedding = self.embedding(x).unsqueeze(1)
convs = [nn.ReLU()(conv(embedding)).squeeze(3) for conv in self.convs]
pool_out = [nn.MaxPool1D(block.shape[2])(block).squeeze(2) for block in convs]
pool_out = paddle.concat(pool_out, 1)
logits = self.linear(pool_out)
return logits
def train(model, dl):
optimizer = optim.Adam(parameters=model.parameters(), learning_rate=1e-3)
criterion = nn.CrossEntropyLoss()
for epoch in range(0, EPOCHS):
train_loss, test_loss = [], []
train_acc, test_acc = [], []
model.train()
for i, (x, y) in enumerate(dl.ids_label("train", BATCH_SIZE)):
x = paddle.Tensor(x)
y = paddle.Tensor(y).unsqueeze(1)
pred = model(x)
loss = criterion(pred, y)
train_loss.append(loss.item())
train_acc.append(paddle.metric.accuracy(pred, y).numpy())
loss.backward()
optimizer.step()
optimizer.clear_grad()
model.eval()
for i, (x, y) in enumerate(dl.ids_label("test", BATCH_SIZE)):
x = paddle.Tensor(x)
y = paddle.Tensor(y).unsqueeze(1)
pred = model(x)
test_loss.append(criterion(pred, y).item())
test_acc.append(paddle.metric.accuracy(pred, y).numpy())
print(
"Epoch: [{}/{}] TrainLoss/TestLoss: {:.4f}/{:.4f} TrainAcc/TestAcc: {:.4f}/{:.4f}".format(
epoch + 1, EPOCHS,
np.mean(train_loss), np.mean(test_loss),
np.mean(train_acc), np.mean(test_acc))
)
paddle.save(cnn.state_dict(), "save/" + model.full_name() +".pdparams")
paddle.save(optimizer.state_dict(), "save/" + model.full_name() + "_Adam.pdparams")
config = MagicDict({
"vocab_size": len(dl.tk.vocab),
"embed_size": 128,
"dropout": 0.5,
"filter_sizes": [2,3,4],
"num_filters": 128,
"num_labels": 2,
"pretrained": None
})
EPOCHS = 5
BATCH_SIZE = 64
cnn = TextCNN(config)
dl = DataLoader(ROOT / "NLPCC14-SC/train.tsv", pretrain="")
train(cnn, dl)
Epoch: [1/5] TrainLoss/TestLoss: 0.5883/0.4185 TrainAcc/TestAcc: 0.6973/0.8269 Epoch: [2/5] TrainLoss/TestLoss: 0.4122/0.2714 TrainAcc/TestAcc: 0.8266/0.9054 Epoch: [3/5] TrainLoss/TestLoss: 0.2845/0.1615 TrainAcc/TestAcc: 0.8956/0.9541 Epoch: [4/5] TrainLoss/TestLoss: 0.1774/0.0900 TrainAcc/TestAcc: 0.9411/0.9796 Epoch: [5/5] TrainLoss/TestLoss: 0.1041/0.0511 TrainAcc/TestAcc: 0.9691/0.9885
预训练模型。
class Ernie(paddle.nn.Layer):
def __init__(self, config):
super(Ernie, self).__init__()
self.ernie_model = paddlenlp.transformers.ErnieModel.from_pretrained(config.pretrained)
self.linear = nn.Linear(config.hidden_size, config.num_labels)
def forward(self, x):
sequence_output, pooled_output = self.ernie_model(x)
logits = self.linear(pooled_output)
return logits
config = MagicDict({
"pretrained": "ernie-1.0",
"hidden_size": 768,
"num_labels": 2,
})
EPOCHS = 5
BATCH_SIZE = 64
ernie = Ernie(config)
dl = DataLoader(ROOT / "NLPCC14-SC/train.tsv", pretrain="ernie-1.0")
[2021-10-23 02:38:31,265] [ INFO] - Already cached /Users/Yam/.paddlenlp/models/ernie-1.0/ernie_v1_chn_base.pdparams [2021-10-23 02:38:39,071] [ INFO] - Weights from pretrained model not used in ErnieModel: ['cls.predictions.layer_norm.weight', 'cls.predictions.decoder_bias', 'cls.predictions.transform.bias', 'cls.predictions.transform.weight', 'cls.predictions.layer_norm.bias'] [2021-10-23 02:38:39,418] [ INFO] - Already cached /Users/Yam/.paddlenlp/models/ernie-1.0/vocab.txt
for b in dl.ids_label("train", 2): break
x, y = b
ernie(paddle.Tensor(x))
Tensor(shape=[2, 2], dtype=float32, place=CPUPlace, stop_gradient=False, [[-1.29681790, 2.99378061], [-1.45288467, 0.72905755]])
# 小心你的 CPU
train(ernie, dl)
直接输出 logits
class Skep(paddle.nn.Layer):
def __init__(self):
super(Skep, self).__init__()
self.skep_model = SkepForSequenceClassification.from_pretrained(
pretrained_model_name_or_path="skep_ernie_1.0_large_ch", num_classes=2)
def forward(self, x):
logits = self.skep_model(x)
return logits
EPOCHS = 5
BATCH_SIZE = 64
skep = Skep()
dl = DataLoader(ROOT / "NLPCC14-SC/train.tsv", pretrain="ernie-1.0")
[2021-10-23 02:46:01,290] [ INFO] - Already cached /Users/Yam/.paddlenlp/models/skep_ernie_1.0_large_ch/skep_ernie_1.0_large_ch.pdparams [2021-10-23 02:46:43,144] [ INFO] - Already cached /Users/Yam/.paddlenlp/models/ernie-1.0/vocab.txt
skep(paddle.Tensor(x))
Tensor(shape=[2, 2], dtype=float32, place=CPUPlace, stop_gradient=False, [[-0.11525318, -0.39649525], [ 0.05798459, -0.43128473]])
直接输出结果
senta = Taskflow("sentiment_analysis")
[2021-10-23 02:59:48,427] [ INFO] - Converting to the inference model cost a little time. [2021-10-23 02:59:54,543] [ INFO] - The inference model save in the path:/Users/Yam/.paddlenlp/taskflow/sentiment_analysis/bilstm/static/inference
senta("怀着十分激动的心情放映,可是看着看着发现,在放映完毕后,出现一集米老鼠的动画片")
[{'text': '怀着十分激动的心情放映,可是看着看着发现,在放映完毕后,出现一集米老鼠的动画片', 'label': 'negative', 'score': 0.6691399216651917}]