#|default_exp foundation #|export from fastcore.imports import * from fastcore.basics import * from functools import lru_cache from contextlib import contextmanager from copy import copy from configparser import ConfigParser import random,pickle,inspect #|hide from fastcore.test import * from nbdev.showdoc import * from fastcore.nb_imports import * #|export @contextmanager def working_directory(path): "Change working directory to `path` and return to previous on exit." prev_cwd = Path.cwd() os.chdir(path) try: yield finally: os.chdir(prev_cwd) #|export def add_docs(cls, cls_doc=None, **docs): "Copy values from `docs` to `cls` docstrings, and confirm all public methods are documented" if cls_doc is not None: cls.__doc__ = cls_doc for k,v in docs.items(): f = getattr(cls,k) if hasattr(f,'__func__'): f = f.__func__ # required for class methods f.__doc__ = v # List of public callables without docstring nodoc = [c for n,c in vars(cls).items() if callable(c) and not n.startswith('_') and c.__doc__ is None] assert not nodoc, f"Missing docs: {nodoc}" assert cls.__doc__ is not None, f"Missing class docs: {cls}" class T: def foo(self): pass def bar(self): pass add_docs(T, cls_doc="A docstring for the class.", foo="The foo method.", bar="The bar method.") test_eq(T.__doc__, "A docstring for the class.") test_eq(T.foo.__doc__, "The foo method.") test_eq(T.bar.__doc__, "The bar method.") class T: def foo(self): pass def bar(self): pass f=lambda: add_docs(T, "A docstring for the class.", foo="The foo method.") test_fail(f, contains="Missing docs") #|hide class _T: def f(self): pass @classmethod def g(cls): pass add_docs(_T, "a", f="f", g="g") test_eq(_T.__doc__, "a") test_eq(_T.f.__doc__, "f") test_eq(_T.g.__doc__, "g") #|export def docs(cls): "Decorator version of `add_docs`, using `_docs` dict" add_docs(cls, **cls._docs) return cls @docs class _T: def f(self): pass def g(cls): pass _docs = dict(cls_doc="The class docstring", f="The docstring for method f.", g="A different docstring for method g.") test_eq(_T.__doc__, "The class docstring") test_eq(_T.f.__doc__, "The docstring for method f.") test_eq(_T.g.__doc__, "A different docstring for method g.") @docs class _T: "The class docstring" def f(self): pass _docs = dict(f="The docstring for method f.") test_eq(_T.__doc__, "The class docstring") test_eq(_T.f.__doc__, "The docstring for method f.") show_doc(is_iter) assert is_iter([1]) assert not is_iter(array(1)) assert is_iter(array([1,2])) assert (o for o in range(3)) #|export def coll_repr(c, max_n=10): "String repr of up to `max_n` items of (possibly lazy) collection `c`" return f'(#{len(c)}) [' + ','.join(itertools.islice(map(repr,c), max_n)) + ( '...' if len(c)>max_n else '') + ']' test_eq(coll_repr(range(1000)), '(#1000) [0,1,2,3,4,5,6,7,8,9...]') test_eq(coll_repr(range(1000), 5), '(#1000) [0,1,2,3,4...]') test_eq(coll_repr(range(10), 5), '(#10) [0,1,2,3,4...]') test_eq(coll_repr(range(5), 5), '(#5) [0,1,2,3,4]') test_eq(coll_repr(range(1000), max_n=5), '(#1000) [0,1,2,3,4...]') #|export def is_bool(x): "Check whether `x` is a bool or None" return isinstance(x,(bool,NoneType)) or risinstance('bool_', x) #|export def mask2idxs(mask): "Convert bool mask or index list to index `L`" if isinstance(mask,slice): return mask mask = list(mask) if len(mask)==0: return [] it = mask[0] if hasattr(it,'item'): it = it.item() if is_bool(it): return [i for i,m in enumerate(mask) if m] return [int(i) for i in mask] test_eq(mask2idxs([False,True,False,True]), [1,3]) test_eq(mask2idxs(array([False,True,False,True])), [1,3]) test_eq(mask2idxs(array([1,2,3])), [1,2,3]) #|export def cycle(o): "Like `itertools.cycle` except creates list of `None`s if `o` is empty" o = listify(o) return itertools.cycle(o) if o is not None and len(o) > 0 else itertools.cycle([None]) test_eq(itertools.islice(cycle([1,2,3]),5), [1,2,3,1,2]) test_eq(itertools.islice(cycle([]),3), [None]*3) test_eq(itertools.islice(cycle(None),3), [None]*3) test_eq(itertools.islice(cycle(1),3), [1,1,1]) #|export def zip_cycle(x, *args): "Like `itertools.zip_longest` but `cycle`s through elements of all but first argument" return zip(x, *map(cycle,args)) test_eq(zip_cycle([1,2,3,4],list('abc')), [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'a')]) #|export def is_indexer(idx): "Test whether `idx` will index a single item in a list" return isinstance(idx,int) or not getattr(idx,'ndim',1) assert is_indexer(1) assert is_indexer(np.array(1)) assert not is_indexer([1, 2]) assert not is_indexer(np.array([[1, 2], [3, 4]])) #|export class CollBase: "Base class for composing a list of `items`" def __init__(self, items): self.items = items def __len__(self): return len(self.items) def __getitem__(self, k): return self.items[list(k) if isinstance(k,CollBase) else k] def __setitem__(self, k, v): self.items[list(k) if isinstance(k,CollBase) else k] = v def __delitem__(self, i): del(self.items[i]) def __repr__(self): return self.items.__repr__() def __iter__(self): return self.items.__iter__() class _T(CollBase): pass l = _T([1,2,3,4,5]) test_eq(len(l), 5) # __len__ test_eq(l[-1], 5); test_eq(l[0], 1) #__getitem__ l[2] = 100; test_eq(l[2], 100) # __set_item__ del l[0]; test_eq(len(l), 4) # __delitem__ test_eq(str(l), '[2, 100, 4, 5]') # __repr__ #|export class _L_Meta(type): def __call__(cls, x=None, *args, **kwargs): if not args and not kwargs and x is not None and isinstance(x,cls): return x return super().__call__(x, *args, **kwargs) #|export class L(GetAttr, CollBase, metaclass=_L_Meta): "Behaves like a list of `items` but can also index with list of indices or masks" _default='items' def __init__(self, items=None, *rest, use_list=False, match=None): if (use_list is not None) or not is_array(items): items = listify(items, *rest, use_list=use_list, match=match) super().__init__(items) @property def _xtra(self): return None def _new(self, items, *args, **kwargs): return type(self)(items, *args, use_list=None, **kwargs) def __getitem__(self, idx): return self._get(idx) if is_indexer(idx) else L(self._get(idx), use_list=None) def copy(self): return self._new(self.items.copy()) def _get(self, i): if is_indexer(i) or isinstance(i,slice): return getattr(self.items,'iloc',self.items)[i] i = mask2idxs(i) return (self.items.iloc[list(i)] if hasattr(self.items,'iloc') else self.items.__array__()[(i,)] if hasattr(self.items,'__array__') else [self.items[i_] for i_ in i]) def __setitem__(self, idx, o): "Set `idx` (can be list of indices, or mask, or int) items to `o` (which is broadcast if not iterable)" if isinstance(idx, int): self.items[idx] = o else: idx = idx if isinstance(idx,L) else listify(idx) if not is_iter(o): o = [o]*len(idx) for i,o_ in zip(idx,o): self.items[i] = o_ def __eq__(self,b): if b is None: return False if risinstance('ndarray', b): return array_equal(b, self) if isinstance(b, (str,dict)) or callable(b): return False return all_equal(b,self) def sorted(self, key=None, reverse=False): return self._new(sorted_ex(self, key=key, reverse=reverse)) def __iter__(self): return iter(self.items.itertuples() if hasattr(self.items,'iloc') else self.items) def __contains__(self,b): return b in self.items def __reversed__(self): return self._new(reversed(self.items)) def __invert__(self): return self._new(not i for i in self) def __repr__(self): return repr(self.items) def _repr_pretty_(self, p, cycle): p.text('...' if cycle else repr(self.items) if is_array(self.items) else coll_repr(self)) def __mul__ (a,b): return a._new(a.items*b) def __add__ (a,b): return a._new(a.items+listify(b)) def __radd__(a,b): return a._new(b)+a def __addi__(a,b): a.items += list(b) return a @classmethod def split(cls, s, sep=None, maxsplit=-1): return cls(s.split(sep,maxsplit)) @classmethod def range(cls, a, b=None, step=None): return cls(range_of(a, b=b, step=step)) def map(self, f, *args, **kwargs): return self._new(map_ex(self, f, *args, gen=False, **kwargs)) def argwhere(self, f, negate=False, **kwargs): return self._new(argwhere(self, f, negate, **kwargs)) def argfirst(self, f, negate=False): if negate: f = not_(f) return first(i for i,o in self.enumerate() if f(o)) def filter(self, f=noop, negate=False, **kwargs): return self._new(filter_ex(self, f=f, negate=negate, gen=False, **kwargs)) def enumerate(self): return L(enumerate(self)) def renumerate(self): return L(renumerate(self)) def unique(self, sort=False, bidir=False, start=None): return L(uniqueify(self, sort=sort, bidir=bidir, start=start)) def val2idx(self): return val2idx(self) def cycle(self): return cycle(self) def map_dict(self, f=noop, *args, **kwargs): return {k:f(k, *args,**kwargs) for k in self} def map_first(self, f=noop, g=noop, *args, **kwargs): return first(self.map(f, *args, **kwargs), g) def itemgot(self, *idxs): x = self for idx in idxs: x = x.map(itemgetter(idx)) return x def attrgot(self, k, default=None): return self.map(lambda o: o.get(k,default) if isinstance(o, dict) else nested_attr(o,k,default)) def starmap(self, f, *args, **kwargs): return self._new(itertools.starmap(partial(f,*args,**kwargs), self)) def zip(self, cycled=False): return self._new((zip_cycle if cycled else zip)(*self)) def zipwith(self, *rest, cycled=False): return self._new([self, *rest]).zip(cycled=cycled) def map_zip(self, f, *args, cycled=False, **kwargs): return self.zip(cycled=cycled).starmap(f, *args, **kwargs) def map_zipwith(self, f, *rest, cycled=False, **kwargs): return self.zipwith(*rest, cycled=cycled).starmap(f, **kwargs) def shuffle(self): it = copy(self.items) random.shuffle(it) return self._new(it) def concat(self): return self._new(itertools.chain.from_iterable(self.map(L))) def reduce(self, f, initial=None): return reduce(f, self) if initial is None else reduce(f, self, initial) def sum(self): return self.reduce(operator.add, 0) def product(self): return self.reduce(operator.mul, 1) def setattrs(self, attr, val): [setattr(o,attr,val) for o in self] #|export add_docs(L, __getitem__="Retrieve `idx` (can be list of indices, or mask, or int) items", range="Class Method: Same as `range`, but returns `L`. Can pass collection for `a`, to use `len(a)`", split="Class Method: Same as `str.split`, but returns an `L`", copy="Same as `list.copy`, but returns an `L`", sorted="New `L` sorted by `key`. If key is str use `attrgetter`; if int use `itemgetter`", unique="Unique items, in stable order", val2idx="Dict from value to index", filter="Create new `L` filtered by predicate `f`, passing `args` and `kwargs` to `f`", argwhere="Like `filter`, but return indices for matching items", argfirst="Return index of first matching item", map="Create new `L` with `f` applied to all `items`, passing `args` and `kwargs` to `f`", map_first="First element of `map_filter`", map_dict="Like `map`, but creates a dict from `items` to function results", starmap="Like `map`, but use `itertools.starmap`", itemgot="Create new `L` with item `idx` of all `items`", attrgot="Create new `L` with attr `k` (or value `k` for dicts) of all `items`.", cycle="Same as `itertools.cycle`", enumerate="Same as `enumerate`", renumerate="Same as `renumerate`", zip="Create new `L` with `zip(*items)`", zipwith="Create new `L` with `self` zip with each of `*rest`", map_zip="Combine `zip` and `starmap`", map_zipwith="Combine `zipwith` and `starmap`", concat="Concatenate all elements of list", shuffle="Same as `random.shuffle`, but not inplace", reduce="Wrapper for `functools.reduce`", sum="Sum of the items", product="Product of the items", setattrs="Call `setattr` on all items" ) #|export #|hide # Here we are fixing the signature of L. What happens is that the __call__ method on the MetaClass of L shadows the __init__ # giving the wrong signature (https://stackoverflow.com/questions/49740290/call-from-metaclass-shadows-signature-of-init). def _f(items=None, *rest, use_list=False, match=None): ... L.__signature__ = inspect.signature(_f) #|export Sequence.register(L); from fastcore.utils import gt d = dict(a=1,b=-5,d=6,e=9).items() test_eq(L(d).itemgot(1).map(abs).filter(gt(4)).sum(), 20) # abs(-5) + abs(6) + abs(9) = 20; 1 was filtered out. t = L(range(12)) test_eq(t, list(range(12))) test_ne(t, list(range(11))) t.reverse() test_eq(t[0], 11) t[3] = "h" test_eq(t[3], "h") t[3,5] = ("j","k") test_eq(t[3,5], ["j","k"]) test_eq(t, L(t)) test_eq(L(L(1,2),[3,4]), ([1,2],[3,4])) t assert isinstance(t, Sequence) import random random.seed(0) random.sample(t, 3) #|hide # test set items with L of collections x = L([[1,2,3], [4,5], [6,7]]) x[0] = [1,2] test_eq(x, L([[1,2], [4,5], [6,7]])) # non-idiomatic None-ness check - avoid infinite recursion some_var = L(['a', 'b']) assert some_var != None, "L != None" import pandas as pd arr = np.arange(9).reshape(3,3) t = L(arr, use_list=None) test_eq(t[1,2], arr[[1,2]]) df = pd.DataFrame({'a':[1,2,3]}) t = L(df, use_list=None) test_eq(t[1,2], L(pd.DataFrame({'a':[2,3]}, index=[1,2]), use_list=None)) t = L() test_eq(t, []) t.append(1) test_eq(t, [1]) t += [3,2] test_eq(t, [1,3,2]) t = t + [4] test_eq(t, [1,3,2,4]) t = 5 + t test_eq(t, [5,1,3,2,4]) test_eq(L(1,2,3), [1,2,3]) test_eq(L(1,2,3), L(1,2,3)) t = L(1)*5 t = t.map(operator.neg) test_eq(t,[-1]*5) test_eq(~L([True,False,False]), L([False,True,True])) t = L(range(4)) test_eq(zip(t, L(1).cycle()), zip(range(4),(1,1,1,1))) t = L.range(100) test_shuffled(t,t.shuffle()) test_eq(L([]).sum(), 0) test_eq(L([]).product(), 1) def _f(x,a=0): return x+a t = L(1)*5 test_eq(t.map(_f), t) test_eq(t.map(_f,1), [2]*5) test_eq(t.map(_f,a=2), [3]*5) test_eq(L([1,2,3]),[1,2,3]) test_eq(L(L([1,2,3])),[1,2,3]) test_ne(L([1,2,3]),[1,2,]) test_eq(L('abc'),['abc']) test_eq(L(range(0,3)),[0,1,2]) test_eq(L(o for o in range(0,3)),[0,1,2]) test_eq(L(array(0)),[array(0)]) test_eq(L([array(0),array(1)]),[array(0),array(1)]) test_eq(L(array([0.,1.1]))[0],array([0.,1.1])) test_eq(L(array([0.,1.1]), use_list=True), [array(0.),array(1.1)]) # `use_list=True` to unwrap arrays/arrays test_eq(L(1,match=[1,2,3]),[1,1,1]) test_eq(L([1,2],match=[2,3]),[1,2]) test_fail(lambda: L([1,2],match=[1,2,3])) test_is(L(t), t) test_eq(L(['a', 'b']), ['a', 'b']) test_ne(L(['a', 'b']), 'ab') test_ne(L(['a', 'b']), {'a':1, 'b':2}) show_doc(L.__getitem__) t = L(range(12)) test_eq(t[1,2], [1,2]) # implicit tuple test_eq(t[[1,2]], [1,2]) # list test_eq(t[:3], [0,1,2]) # slice test_eq(t[[False]*11 + [True]], [11]) # mask test_eq(t[array(3)], 3) show_doc(L.__setitem__) t[4,6] = 0 test_eq(t[4,6], [0,0]) t[4,6] = [1,2] test_eq(t[4,6], [1,2]) show_doc(L.unique) test_eq(L(4,1,2,3,4,4).unique(), [4,1,2,3]) show_doc(L.val2idx) test_eq(L(1,2,3).val2idx(), {3:2,1:0,2:1}) show_doc(L.filter) list(t) test_eq(t.filter(lambda o:o<5), [0,1,2,3,1,2]) test_eq(t.filter(lambda o:o<5, negate=True), [5,7,8,9,10,11]) show_doc(L.argwhere) test_eq(t.argwhere(lambda o:o<5), [0,1,2,3,4,6]) show_doc(L.argfirst) test_eq(t.argfirst(lambda o:o>4), 5) test_eq(t.argfirst(lambda o:o>4,negate=True),0) show_doc(L.map) test_eq(L.range(4).map(operator.neg), [0,-1,-2,-3]) test_eq(L.range(4).map('#{}#'), ['#0#','#1#','#2#','#3#']) test_eq(L.range(4).map(list('abcd')), list('abcd')) def f(a=None,b=None): return b test_eq(L.range(4).map(f, b=arg0), range(4)) show_doc(L.map_dict) test_eq(L(range(1,5)).map_dict(), {1:1, 2:2, 3:3, 4:4}) test_eq(L(range(1,5)).map_dict(operator.neg), {1:-1, 2:-2, 3:-3, 4:-4}) show_doc(L.zip) t = L([[1,2,3],'abc']) test_eq(t.zip(), [(1, 'a'),(2, 'b'),(3, 'c')]) t = L([[1,2,3,4],['a','b','c']]) test_eq(t.zip(cycled=True ), [(1, 'a'),(2, 'b'),(3, 'c'),(4, 'a')]) test_eq(t.zip(cycled=False), [(1, 'a'),(2, 'b'),(3, 'c')]) show_doc(L.map_zip) t = L([1,2,3],[2,3,4]) test_eq(t.map_zip(operator.mul), [2,6,12]) show_doc(L.zipwith) b = [[0],[1],[2,2]] t = L([1,2,3]).zipwith(b) test_eq(t, [(1,[0]), (2,[1]), (3,[2,2])]) show_doc(L.map_zipwith) test_eq(L(1,2,3).map_zipwith(operator.mul, [2,3,4]), [2,6,12]) show_doc(L.itemgot) test_eq(t.itemgot(1), b) show_doc(L.attrgot) # Example when items are not a dict a = [SimpleNamespace(a=3,b=4),SimpleNamespace(a=1,b=2)] test_eq(L(a).attrgot('b'), [4,2]) #Example of when items are a dict b =[{'id': 15, 'name': 'nbdev'}, {'id': 17, 'name': 'fastcore'}] test_eq(L(b).attrgot('id'), [15, 17]) show_doc(L.sorted) test_eq(L(a).sorted('a').attrgot('b'), [2,4]) show_doc(L.split) test_eq(L.split('a b c'), list('abc')) show_doc(L.range) test_eq_type(L.range([1,1,1]), L(range(3))) test_eq_type(L.range(5,2,2), L(range(5,2,2))) show_doc(L.concat) test_eq(L([0,1,2,3],4,L(5,6)).concat(), range(7)) show_doc(L.copy) t = L([0,1,2,3],4,L(5,6)).copy() test_eq(t.concat(), range(7)) show_doc(L.map_first) t = L(0,1,2,3) test_eq(t.map_first(lambda o:o*2 if o>2 else None), 6) show_doc(L.setattrs) t = L(SimpleNamespace(),SimpleNamespace()) t.setattrs('foo', 'bar') test_eq(t.attrgot('foo'), ['bar','bar']) #|export def save_config_file(file, d, **kwargs): "Write settings dict to a new config file, or overwrite the existing one." config = ConfigParser(**kwargs) config['DEFAULT'] = d config.write(open(file, 'w')) #|export def read_config_file(file, **kwargs): config = ConfigParser(**kwargs) config.read(file, encoding='utf8') return config['DEFAULT'] _d = dict(user='fastai', lib_name='fastcore', some_path='test', some_bool=True, some_num=3) try: save_config_file('tmp.ini', _d) res = read_config_file('tmp.ini') finally: os.unlink('tmp.ini') dict(res) #|export class Config: "Reading and writing `ConfigParser` ini files" def __init__(self, cfg_path, cfg_name, create=None, save=True, extra_files=None, types=None): self.types = types or {} cfg_path = Path(cfg_path).expanduser().absolute() self.config_path,self.config_file = cfg_path,cfg_path/cfg_name self._cfg = ConfigParser() self.d = self._cfg['DEFAULT'] found = [Path(o) for o in self._cfg.read(L(extra_files)+[self.config_file], encoding='utf8')] if self.config_file not in found and create is not None: self._cfg.read_dict({'DEFAULT':create}) if save: cfg_path.mkdir(exist_ok=True, parents=True) save_config_file(self.config_file, create) def __repr__(self): return repr(dict(self._cfg.items('DEFAULT', raw=True))) def __setitem__(self,k,v): self.d[k] = str(v) def __contains__(self,k): return k in self.d def save(self): save_config_file(self.config_file,self.d) def __getattr__(self,k): return stop(AttributeError(k)) if k=='d' or k not in self.d else self.get(k) def __getitem__(self,k): return stop(IndexError(k)) if k not in self.d else self.get(k) def get(self,k,default=None): v = self.d.get(k, default) if v is None: return None typ = self.types.get(k, None) if typ==bool: return str2bool(v) if not typ: return str(v) if typ==Path: return self.config_path/v return typ(v) def path(self,k,default=None): v = self.get(k, default) return v if v is None else self.config_path/v save_config_file('../tmp.ini', _d) try: cfg = Config('..', 'tmp.ini') finally: os.unlink('../tmp.ini') cfg try: cfg = Config('..', 'tmp.ini', create=_d) finally: os.unlink('../tmp.ini') cfg cfg = Config('..', 'tmp.ini', create=_d, save=False) test_eq(cfg.user,'fastai') assert not Path('../tmp.ini').exists() test_eq(cfg.user,'fastai') test_eq(cfg['some_path'], 'test') test_eq(cfg.get('foo','bar'),'bar') with tempfile.TemporaryDirectory() as d: a = Config(d, 'a.ini', {'a':0,'b':0}) b = Config(d, 'b.ini', {'a':1,'c':0}) c = Config(d, 'c.ini', {'a':2,'d':0}, extra_files=[a.config_file,b.config_file]) test_eq(c.d, {'a':'2','b':'0','c':'0','d':'0'}) _types = dict(some_path=Path, some_bool=bool, some_num=int) cfg = Config('..', 'tmp.ini', create=_d, save=False, types=_types) test_eq(cfg.user,'fastai') test_eq(cfg['some_path'].resolve(), (Path('..')/'test').resolve()) test_eq(cfg.get('some_num'), 3) #|hide import nbdev; nbdev.nbdev_export()