#!/usr/bin/env python # coding: utf-8 # In[1]: import re import numpy as np from scipy.sparse import csr_matrix from sklearn.datasets import fetch_20newsgroups from sklearn.feature_extraction.text import CountVectorizer as skCountVectorizer # In[2]: class CountVectorizer(): def _analyze(self, doc): token_pattern = re.compile(r"\b\w\w+\b") return token_pattern.findall(doc.lower()) def _count_vocab(self, X, fixed_vocabulary): if fixed_vocabulary is False: vocabulary = {} vocabulary_cnt = 0 else: vocabulary = self.vocabulary_ values = [] j_indices = [] indptr = [0] for doc in X: feature_counter = {} for feature in self._analyze(doc): if fixed_vocabulary is False: if feature not in vocabulary: vocabulary[feature] = vocabulary_cnt vocabulary_cnt += 1 else: if feature not in vocabulary: continue feature_idx = vocabulary[feature] if feature_idx not in feature_counter: feature_counter[feature_idx] = 1 else: feature_counter[feature_idx] += 1 values.extend(feature_counter.values()) j_indices.extend(feature_counter.keys()) indptr.append(len(j_indices)) Xt = csr_matrix((values, j_indices, indptr), shape=(len(indptr) - 1, len(vocabulary))) return vocabulary, Xt def fit(self, X): vocabulary, Xt = self. _count_vocab(X, fixed_vocabulary=False) sorted_features = sorted(vocabulary.items()) for new_val, (term, old_val) in enumerate(sorted_features): vocabulary[term] = new_val self.vocabulary_ = vocabulary return self def transform(self, X): _, Xt = self._count_vocab(X, fixed_vocabulary=True) return Xt def get_feature_names(self): return sorted(self.vocabulary_.keys()) # In[3]: X = fetch_20newsgroups(remove=('headers', 'footers', 'quotes')).data for subset in [10, 100, 1000]: X_train = X[:subset] X_test = X[subset: 2 * subset] vec1 = CountVectorizer().fit(X_train) vec2 = skCountVectorizer().fit(X_train) assert np.array_equal(vec1.get_feature_names(), vec2.get_feature_names()) Xt1 = vec1.transform(X_train) Xt2 = vec2.transform(X_train) assert np.array_equal(Xt1.toarray(), Xt2.toarray()) Xt1 = vec1.transform(X_test) Xt2 = vec2.transform(X_test) assert np.array_equal(Xt1.toarray(), Xt2.toarray())