#!/usr/bin/env python # coding: utf-8 # In[3]: import os import pandas as pd import torch from torch import nn, optim from torch.utils.data import DataLoader, Dataset from transformers import AutoTokenizer, AutoModel, get_linear_schedule_with_warmup from sklearn.metrics import accuracy_score, f1_score, classification_report from sklearn.model_selection import StratifiedKFold, train_test_split import random import numpy as np import statistics # In[4]: #Mudanças principais: #1400 #Modelo Bertimbau Large: Alterado o model_name para 'neuralmind/bert-large-portuguese-cased'. #LR= 3e-5. #Descongelamento das camadas: Parametrizamos o número de camadas finais do BERT a descongelar, via unfreeze_layers. Por exemplo, se definirmos unfreeze_layers=8, descongelamos as últimas 8 camadas. #Outros otimizadores e LR Schedulers: Mantemos o AdamW como otimizador principal, mas agora adicionamos um scheduler (get_linear_schedule_with_warmup do transformers) para ajustar a taxa de aprendizado durante o treino. Caso queira testar outro otimizador, basta substituir a linha do optimizador. Também deixamos comentado outro exemplo (SGD) para referência. #Para testar diferentes taxas de aprendizado, basta alterar learning_rate no código. #Para testar diferentes números de camadas a descongelar, altere unfreeze_layers. #4 #processo de treinamento e avaliação várias vezes (uma para cada fold). #diminuindo épocas ou early stopping, se necessário. #O early stopping agora é feito com base no conjunto de validação interno a cada fold. #Esse processo é mais demorado, pois treinaremos o modelo K vezes. #Ajuste parâmetros (como número de épocas, taxa de aprendizado, etc.) conforme necessário. # In[5]: # Semente para reprodutibilidade seed = 42 random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) # In[6]: # Configurações gerais device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f'Usando dispositivo: {device}') model_name = 'neuralmind/bert-large-portuguese-cased' learning_rate = 3e-5 unfreeze_layers = 4 nclasses = 2 nepochs = 5 batch_size = 8 batch_status = 32 early_stop = 2 max_length = 360 write_path = 'model_cv' if not os.path.exists(write_path): os.makedirs(write_path) # In[7]: # Carregar os dados data = pd.read_csv("DATAFRAME1400.csv") # In[8]: # Dataset Customizado class CustomDataset(Dataset): def __init__(self, data, tokenizer, max_length): self.data = data.reset_index(drop=True) self.tokenizer = tokenizer self.max_length = max_length def __len__(self): return len(self.data) def __getitem__(self, idx): text = self.data.iloc[idx]['text'] label = self.data.iloc[idx]['contra'] inputs = self.tokenizer(text, return_tensors='pt', padding='max_length', truncation=True, max_length=self.max_length) return {key: val.squeeze(0) for key, val in inputs.items()}, torch.tensor(label) # In[9]: # Modelo class CustomBERTModel(nn.Module): def __init__(self, model_name, nclasses, unfreeze_layers): super(CustomBERTModel, self).__init__() self.bert = AutoModel.from_pretrained(model_name) self.dropout = nn.Dropout(0.3) self.classifier = nn.Linear(self.bert.config.hidden_size, nclasses) # Congelar tudo inicialmente for param in self.bert.parameters(): param.requires_grad = False # Descongelar as últimas unfreeze_layers camadas if unfreeze_layers > 0: for param in self.bert.encoder.layer[-unfreeze_layers:].parameters(): param.requires_grad = True def forward(self, input_ids, attention_mask, token_type_ids=None): outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids) pooled_output = outputs.pooler_output dropped_out = self.dropout(pooled_output) logits = self.classifier(dropped_out) return logits def evaluate(model, dataloader): model.eval() y_real, y_pred = [], [] with torch.no_grad(): for inputs, labels in dataloader: inputs = {key: val.to(device) for key, val in inputs.items()} labels = labels.to(device) logits = model(**inputs) pred_labels = torch.argmax(logits, 1) y_real.extend(labels.cpu().tolist()) y_pred.extend(pred_labels.cpu().tolist()) f1 = f1_score(y_real, y_pred, average='weighted') acc = accuracy_score(y_real, y_pred) return f1, acc, (y_real, y_pred) # Cross-validation k = 5 skf = StratifiedKFold(n_splits=k, shuffle=True, random_state=seed) X = data.index.values y = data['contra'].values f1_scores = [] acc_scores = [] tokenizer = AutoTokenizer.from_pretrained(model_name, do_lower_case=False) fold_num = 1 for train_val_idx, test_idx in skf.split(X, y): print(f"\n=== Fold {fold_num}/{k} ===") # Separamos test fold test_data = data.iloc[test_idx] # A partir do train_val_idx, dividimos em train e val train_val_data = data.iloc[train_val_idx] train_data, val_data = train_test_split(train_val_data, test_size=0.1, random_state=seed, stratify=train_val_data['contra']) # Criar datasets e dataloaders train_dataset = CustomDataset(train_data, tokenizer, max_length) val_dataset = CustomDataset(val_data, tokenizer, max_length) test_dataset = CustomDataset(test_data, tokenizer, max_length) traindata = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) valdata = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) testdata = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) model = CustomBERTModel(model_name, nclasses, unfreeze_layers).to(device) optimizer = optim.AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rate) loss_fn = nn.CrossEntropyLoss() total_steps = len(traindata) * nepochs scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=int(0.1 * total_steps), num_training_steps=total_steps) max_f1, repeat = 0, 0 best_model_path = os.path.join(write_path, f'best_model_fold{fold_num}.pth') for epoch in range(nepochs): model.train() losses = [] for batch_idx, (inputs, labels) in enumerate(traindata): inputs = {key: val.to(device) for key, val in inputs.items()} labels = labels.to(device) logits = model(**inputs) loss = loss_fn(logits, labels) losses.append(float(loss)) # Backprop loss.backward() optimizer.step() scheduler.step() optimizer.zero_grad() if (batch_idx + 1) % batch_status == 0: print(f'Epoch: {epoch} [{batch_idx + 1}/{len(traindata)}]\tLoss: {loss:.6f}') f1_val, acc_val, _ = evaluate(model, valdata) print(f'Epoch {epoch} - Val F1: {f1_val:.4f}, Val Accuracy: {acc_val:.4f}') if f1_val > max_f1: torch.save(model.state_dict(), best_model_path) max_f1 = f1_val repeat = 0 print('Novo melhor modelo salvo.') else: repeat += 1 if repeat == early_stop: print('Early stopping atingido.') break # Avaliar no teste state_dict = torch.load(best_model_path, weights_only=True) model.load_state_dict(state_dict) f1_test, acc_test, (y_real, y_pred) = evaluate(model, testdata) print("Desempenho no conjunto de teste desta dobra:") print(classification_report(y_real, y_pred, target_names=['0', '1'])) print(f"F1 (teste): {f1_test:.4f}, Accuracy (teste): {acc_test:.4f}") f1_scores.append(f1_test) acc_scores.append(acc_test) fold_num += 1 # Resultados médios da validação cruzada print("\n=== Resultados Médios da Validação Cruzada ===") print(f"F1 médio: {statistics.mean(f1_scores):.4f} (+/- {statistics.pstdev(f1_scores):.4f})") print(f"Acurácia média: {statistics.mean(acc_scores):.4f} (+/- {statistics.pstdev(acc_scores):.4f})")