#|hide
#|default_exp process
A notebook processor
#|export
from nbdev.read import *
from nbdev.maker import *
from nbdev.imports import *
from execnb.nbio import *
from fastcore.script import *
from fastcore.imports import *
from fastcore.xtras import *
from collections import defaultdict
from pprint import pformat
from inspect import signature,Parameter
import ast,contextlib,copy
from collections import defaultdict
#|hide
from fastcore.test import *
from pdb import set_trace
from importlib import reload
from fastcore import shutil
Special comments at the start of a cell can be used to provide information to nbdev
about how to process a cell, so we need to be able to find the location of these comments.
minimal = read_nb('../tests/minimal.ipynb')
#|export
#from https://github.com/quarto-dev/quarto-cli/blob/main/src/resources/jupyter/notebook.py
langs = defaultdict(lambda: '#',
r = "#",
python = "#",
julia = "#",
scala = "//",
matlab = "%",
csharp = "//",
fsharp = "//",
c = ["/*", "*/"],
css = ["/*", "*/"],
sas = ["*", ";"],
powershell = "#",
bash = "#",
sql = "--",
mysql = "--",
psql = "--",
lua = "--",
cpp = "//",
cc = "//",
stan = "#",
octave = "#",
fortran = "!",
fortran95 = "!",
awk = "#",
gawk = "#",
stata = "*",
java = "//",
groovy = "//",
sed = "#",
perl = "#",
ruby = "#",
tikz = "%",
javascript = "//",
js = "//",
d3 = "//",
node = "//",
sass = "//",
coffee = "#",
go = "//",
asy = "//",
haskell = "--",
dot = "//",
apl = "⍝"
)
#|export
def nb_lang(nb): return nested_attr(nb, 'metadata.kernelspec.language', 'python')
#|hide
test_eq(nb_lang(read_nb('../tests/minimal.ipynb')), 'python')
test_eq(nb_lang(read_nb('../tests/APL.ipynb')), 'apl')
#|export
def _dir_pre(lang=None): return fr"\s*{langs[lang]}\s*\|"
def _quarto_re(lang=None): return re.compile(_dir_pre(lang) + r'\s*\w+\s*:')
#|export
def _directive(s, lang='python'):
s = re.sub('^'+_dir_pre(lang), f"{langs[lang]}|", s)
if ':' in s: s = s.replace(':', ': ')
s = (s.strip()[2:]).strip().split()
if not s: return None
direc,*args = s
return direc,args
#|export
def _norm_quarto(s, lang='python'):
"normalize quarto directives so they have a space after the colon"
m = _quarto_re(lang).match(s)
return m.group(0) + ' ' + _quarto_re(lang).sub('', s).lstrip() if m else s
#|hide
test_eq(_norm_quarto('#|foo:bar'), '#|foo: bar')
test_eq(_norm_quarto('#|foo: bar'), '#|foo: bar')
test_eq(_norm_quarto('#|not_quarto'), '#|not_quarto')
#|export
def first_code_ln(code_list, re_pattern=None, lang='python'):
if re_pattern is None: re_pattern = _dir_pre(lang)
"get first line number where code occurs, where `code_list` is a list of code"
return first(i for i,o in enumerate(code_list) if o.strip() != '' and not re.match(re_pattern, o))
_tst = """
#|default_exp
#|export
#|hide_input
foo
"""
test_eq(first_code_ln(_tst.splitlines(True)), 4)
#|export
def extract_directives(cell, remove=True, lang='python'):
"Take leading comment directives from lines of code in `ss`, remove `#|`, and split"
if cell.source:
ss = cell.source.splitlines(True)
first_code = first_code_ln(ss, lang=lang)
if not ss or first_code==0: return {}
pre = ss[:first_code]
if remove:
# Leave Quarto directives in place for later processing
cell['source'] = ''.join([_norm_quarto(o, lang) for o in pre if _quarto_re(lang).match(o)] + ss[first_code:])
return dict(L(_directive(s, lang) for s in pre).filter())
Comment directives start with #|
, followed by whitespace delimited tokens, which extract_directives
extracts from the start of a cell, up until a blank line or a line containing something other than comments. The extracted lines are removed from the source.
exp = AttrDict(source = """#|export module
#|eval:false
#| hide
# | foo bar
# |woo: baz
1+2
#bar""")
test_eq(extract_directives(exp), {'export':['module'], 'hide':[], 'eval:': ['false'], 'foo': ['bar'], 'woo:': ['baz']})
test_eq(exp.source, '#|eval: false\n# |woo: baz\n1+2\n#bar')
#|hide
exp = AttrDict(source = """
⍝|hide
⍝| foo: bar
# |woo: baz
1+2
⍝bar""")
test_eq(extract_directives(exp, lang='apl'), {'hide': [], 'foo:': ['bar']})
#|export
def opt_set(var, newval):
"newval if newval else var"
return newval if newval else var
#|export
def instantiate(x, **kwargs):
"Instantiate `x` if it's a type"
return x(**kwargs) if isinstance(x,type) else x
def _mk_procs(procs, nb): return L(procs).map(instantiate, nb=nb)
#|export
def _is_direc(f): return getattr(f, '__name__', '-')[-1]=='_'
#|export
class NBProcessor:
"Process cells and nbdev comments in a notebook"
def __init__(self, path=None, procs=None, preprocs=None, postprocs=None, nb=None, debug=False, rm_directives=True, process=False):
self.nb = read_nb(path) if nb is None else nb
self.lang = nb_lang(self.nb)
for cell in self.nb.cells: cell.directives_ = extract_directives(cell, remove=rm_directives, lang=self.lang)
self.procs,self.preprocs,self.postprocs = map_ex((procs,preprocs,postprocs), _mk_procs, nb=self.nb)
self.debug,self.rm_directives = debug,rm_directives
if process: self.process()
def _process_cell(self, cell):
self.cell = cell
cell.nb = self.nb
for proc in self.procs:
if not hasattr(cell,'source'): return
if not hasattr(cell, 'directives_'):
cell.nb=None
raise Exception(cell)
if cell.cell_type=='code' and cell.directives_:
for cmd,args in cell.directives_.items():
self._process_comment(proc, cell, cmd, args)
if not hasattr(cell,'source'): return
if callable(proc) and not _is_direc(proc):
cell = opt_set(cell, proc(cell))
# try: cell = opt_set(cell, proc(cell))
# except:
# cell.nb = None
# raise Exception((proc,cell))
cell.nb = None
def _process_comment(self, proc, cell, cmd, args):
if _is_direc(proc) and getattr(proc, '__name__', '-')[:-1]==cmd: f=proc
else: f = getattr(proc, f'_{cmd}_', None)
if not f: return
if self.debug: print(cmd, args, f)
return f(self, cell, *args)
def process(self):
"Process all cells with `process_cell`"
for proc in self.preprocs:
self.nb = opt_set(self.nb, proc(self.nb))
for i,cell in enumerate(self.nb.cells): cell.idx_ = i
for cell in self.nb.cells: self._process_cell(cell)
for proc in self.postprocs: self.nb = opt_set(self.nb, proc(self.nb))
self.nb.cells = [c for c in self.nb.cells if c and getattr(c,'source',None) is not None]
Cell processors can be callables (e.g regular functions), in which case they are called for every cell:
everything_fn = '../tests/01_everything.ipynb'
def print_execs(cell):
if 'exec' in cell.source: print(cell.source)
NBProcessor(everything_fn, print_execs).process()
exec("o_y=1") exec("p_y=1") _all_ = [o_y, 'p_y']
Comment directives are put in a cell attribute directive_
as a dictionary keyed by directive name:
def printme_func(cell):
if cell.directives_ and 'printme' in cell.directives_: print(cell.directives_['printme'])
NBProcessor(everything_fn, printme_func).process()
['testing']
However, a more convenient way to handle comment directives is to use a class as a processor, and include a method in your class with the same name as your directive, surrounded by underscores:
class _PrintExample:
def _printme_(self, nbp, cell, to_print): print(to_print)
NBProcessor(everything_fn, _PrintExample()).process()
testing
In the case that your processor supports just one comment directive, you can just use a regular function, with the same name as your directive, but with an underscore appended -- here printme_
is identical to _PrintExample
above:
def printme_(nbp, cell, to_print): print(to_print)
NBProcessor(everything_fn, printme_).process()
testing
#|eval: false
basic_export_nb2('01_read.ipynb', 'read')
basic_export_nb2('02_maker.ipynb', 'maker')
basic_export_nb2('03_process.ipynb', 'process')
g = exec_new('import nbdev.process')
assert hasattr(g['nbdev'].process, 'NBProcessor')