#!/usr/bin/env python # coding: utf-8 # In[1]: import re import numpy as np from scipy.sparse import csr_matrix, diags from sklearn.datasets import fetch_20newsgroups from sklearn.feature_extraction.text import TfidfVectorizer as skTfidfVectorizer # In[2]: class TfidfVectorizer(): def _analyze(self, doc): token_pattern = re.compile(r"\b\w\w+\b") return token_pattern.findall(doc.lower()) def _count_vocab(self, X, fixed_vocabulary): if fixed_vocabulary is False: vocabulary = {} vocabulary_cnt = 0 else: vocabulary = self.vocabulary_ values = [] j_indices = [] indptr = [0] for doc in X: feature_counter = {} for feature in self._analyze(doc): if fixed_vocabulary is False: if feature not in vocabulary: vocabulary[feature] = vocabulary_cnt vocabulary_cnt += 1 else: if feature not in vocabulary: continue feature_idx = vocabulary[feature] if feature_idx not in feature_counter: feature_counter[feature_idx] = 1 else: feature_counter[feature_idx] += 1 values.extend(feature_counter.values()) j_indices.extend(feature_counter.keys()) indptr.append(len(j_indices)) Xt = csr_matrix((values, j_indices, indptr), shape=(len(indptr) - 1, len(vocabulary))) return vocabulary, Xt def fit(self, X): vocabulary, Xt = self. _count_vocab(X, fixed_vocabulary=False) sorted_features = sorted(vocabulary.items()) map_index = np.zeros(len(sorted_features), dtype=int) for new_val, (term, old_val) in enumerate(sorted_features): vocabulary[term] = new_val map_index[old_val] = new_val Xt.indices = map_index[Xt.indices] self.vocabulary_ = vocabulary df = np.bincount(Xt.indices, minlength=Xt.shape[1]) + 1 n_samples = Xt.shape[0] + 1 self.idf_ = np.log(n_samples / df) + 1 self._idf_diag = diags(self.idf_, shape=(Xt.shape[1], Xt.shape[1]), format='csr') return self def transform(self, X): _, Xt = self._count_vocab(X, fixed_vocabulary=True) return Xt * self._idf_diag def get_feature_names(self): return sorted(self.vocabulary_.keys()) # In[3]: X = fetch_20newsgroups(remove=('headers', 'footers', 'quotes')).data for subset in [10, 100, 1000]: X_train = X[:subset] X_test = X[subset: 2 * subset] vec1 = TfidfVectorizer().fit(X_train) # scikit-learn uses l2 norm by default vec2 = skTfidfVectorizer(norm=None).fit(X_train) assert np.array_equal(vec1.get_feature_names(), vec2.get_feature_names()) Xt1 = vec1.transform(X_train) Xt2 = vec2.transform(X_train) assert np.allclose(Xt1.toarray(), Xt2.toarray()) Xt1 = vec1.transform(X_test) Xt2 = vec2.transform(X_test) assert np.allclose(Xt1.toarray(), Xt2.toarray())