#!/usr/bin/env python # coding: utf-8 # In[1]: import numpy as np from scipy.special import expit from scipy.optimize import minimize from copy import deepcopy from sklearn.base import BaseEstimator, ClassifierMixin from sklearn.datasets import load_iris from sklearn.ensemble import BaggingClassifier as skBaggingClassifier #from sklearn.linear_model import LogisticRegression # In[2]: def accuracy_score(y_true, y_pred): return np.mean(y_true == y_pred) # In[3]: class LogisticRegression(BaseEstimator, ClassifierMixin): def __init__(self, C=1.0): self.C = C def _encode(self, y): classes = np.unique(y) y_train = np.full((y.shape[0], len(classes)), -1) for i, c in enumerate(classes): y_train[y == c, i] = 1 if len(classes) == 2: y_train = y_train[:, 1].reshape(-1, 1) return classes, y_train @staticmethod def _cost_grad(w, X, y, alpha): def _log_logistic(x): if x > 0: return -np.log(1 + np.exp(-x)) else: return x - np.log(1 + np.exp(x)) yz = y * (np.dot(X, w[:-1]) + w[-1]) cost = -np.sum(np.vectorize(_log_logistic)(yz)) + 0.5 * alpha * np.dot(w[:-1], w[:-1]) grad = np.zeros(len(w)) t = (expit(yz) - 1) * y grad[:-1] = np.dot(X.T, t) + alpha * w[:-1] grad[-1] = np.sum(t) return cost, grad def _solve_lbfgs(self, X, y): result = np.zeros((y.shape[1], X.shape[1] + 1)) for i in range(y.shape[1]): cur_y = y[:, i] w0 = np.zeros(X.shape[1] + 1) res = minimize(fun=self._cost_grad, jac=True, x0=w0, args=(X, cur_y, 1 / self.C), method='L-BFGS-B') result[i] = res.x return result[:, :-1], result[:, -1] def fit(self, X, y): self.classes_, y_train = self._encode(y) self.coef_, self.intercept_ = self._solve_lbfgs(X, y_train) return self def decision_function(self, X): scores = np.dot(X, self.coef_.T) + self.intercept_ if scores.shape[1] == 1: return scores.ravel() else: return scores def predict(self, X): scores = self.decision_function(X) if len(scores.shape) == 1: indices = (scores > 0).astype(int) else: indices = np.argmax(scores, axis=1) return self.classes_[indices] def predict_proba(self, X): scores = self.decision_function(X) prob = expit(scores) if len(scores.shape) == 1: prob = np.vstack((1 - prob, prob)).T else: prob /= np.sum(prob, axis=1)[:, np.newaxis] return prob # In[4]: class BaggingClassifier(): def __init__(self, base_estimator, n_estimators, oob_score, random_state): self.base_estimator = base_estimator self.n_estimators = n_estimators self.oob_score = oob_score self.random_state = random_state def fit(self, X, y): self.classes_, y_train = np.unique(y, return_inverse=True) self.n_classes_ = len(self.classes_) MAX_INT = np.iinfo(np.int32).max rng = np.random.RandomState(self.random_state) self._seeds = rng.randint(MAX_INT, size=self.n_estimators) self.estimators_ = [] self.estimators_samples_ = [] for i in range(self.n_estimators): est = deepcopy(self.base_estimator) rng = np.random.RandomState(self._seeds[i]) sample_indices = rng.randint(0, X.shape[0], X.shape[0]) self.estimators_samples_.append(sample_indices) est.fit(X[sample_indices], y_train[sample_indices]) self.estimators_.append(est) if self.oob_score: self._set_oob_score(X, y_train) return self def _set_oob_score(self, X, y): predictions = np.zeros((X.shape[0], self.n_classes_)) for i in range(self.n_estimators): mask = np.ones(X.shape[0], dtype=bool) mask[self.estimators_samples_[i]] = False predictions[mask] += self.estimators_[i].predict_proba(X[mask]) self.oob_decision_function_ = predictions / np.sum(predictions, axis=1)[:, np.newaxis] self.oob_score_ = accuracy_score(y, np.argmax(predictions, axis=1)) def predict_proba(self, X): proba = np.zeros((X.shape[0], self.n_classes_)) for i in range(self.n_estimators): proba += self.estimators_[i].predict_proba(X) proba /= self.n_estimators return proba def predict(self, X): proba = self.predict_proba(X) return self.classes_[np.argmax(proba, axis=1)] # In[5]: X, y = load_iris(return_X_y=True) clf1 = BaggingClassifier(base_estimator=LogisticRegression(), n_estimators=100, oob_score=True, random_state=0).fit(X, y) clf2 = skBaggingClassifier(base_estimator=LogisticRegression(), n_estimators=100, oob_score=True, random_state=0).fit(X, y) assert np.allclose(clf1._seeds, clf2._seeds) assert np.array_equal(clf1.estimators_samples_, clf2.estimators_samples_) for i in range(clf1.n_estimators): assert np.allclose(clf1.estimators_[i].coef_, clf2.estimators_[i].coef_) assert np.allclose(clf1.estimators_[i].intercept_, clf2.estimators_[i].intercept_) assert np.allclose(clf1.oob_decision_function_, clf2.oob_decision_function_) assert np.allclose(clf1.oob_score_, clf2.oob_score_) prob1 = clf1.predict_proba(X) prob2 = clf2.predict_proba(X) assert np.allclose(prob1, prob2) pred1 = clf1.predict(X) pred2 = clf2.predict(X) assert np.array_equal(pred1, pred2)