#|hide
#|default_exp process
A notebook processor
#|export
from nbdev.config import *
from nbdev.maker import *
from nbdev.imports import *
from execnb.nbio import *
from fastcore.script import *
from fastcore.imports import *
from collections import defaultdict
#|hide
from fastcore.test import *
from pdb import set_trace
from importlib import reload
from fastcore import shutil
Special comments at the start of a cell can be used to provide information to nbdev
about how to process a cell, so we need to be able to find the location of these comments.
minimal = read_nb('..//tests/minimal.ipynb')
#|export
# from https://github.com/quarto-dev/quarto-cli/blob/main/src/resources/jupyter/notebook.py
langs = defaultdict(
lambda: '#', r = "#", python = "#", julia = "#", scala = "//", matlab = "%", csharp = "//", fsharp = "//",
c = ["/*","*/"], css = ["/*","*/"], sas = ["*",";"], powershell = "#", bash = "#", sql = "--", mysql = "--", psql = "--",
lua = "--", cpp = "//", cc = "//", stan = "#", octave = "#", fortran = "!", fortran95 = "!", awk = "#", gawk = "#", stata = "*",
java = "//", groovy = "//", sed = "#", perl = "#", ruby = "#", tikz = "%", javascript = "//", js = "//", d3 = "//", node = "//",
sass = "//", coffee = "#", go = "//", asy = "//", haskell = "--", dot = "//", apl = "⍝")
#|export
def nb_lang(nb): return nested_attr(nb, 'metadata.kernelspec.language', 'python')
#|hide
test_eq(nb_lang(read_nb('..//tests/minimal.ipynb')), 'python')
test_eq(nb_lang(read_nb('..//tests/APL.ipynb')), 'apl')
#|export
def _dir_pre(lang=None): return fr"\s*{langs[lang]}\s*\|"
def _quarto_re(lang=None): return re.compile(_dir_pre(lang) + r'\s*[\w|-]+\s*:')
#|hide
assert _quarto_re().match('#|code-fold: show')
assert _quarto_re().match('#|hide: true')
assert not _quarto_re().match('#|code fold: show') #not a valid quarto directive
#|export
def _directive(s, lang='python'):
s = re.sub('^'+_dir_pre(lang), f"{langs[lang]}|", s)
if ':' in s: s = s.replace(':', ': ')
s = (s.strip()[2:]).strip().split()
if not s: return None
direc,*args = s
return direc,args
#|export
def _norm_quarto(s, lang='python'):
"normalize quarto directives so they have a space after the colon"
m = _quarto_re(lang).match(s)
return m.group(0) + ' ' + _quarto_re(lang).sub('', s).lstrip() if m else s
#|hide
test_eq(_norm_quarto('#|foo:bar'), '#|foo: bar')
test_eq(_norm_quarto('#|foo: bar'), '#|foo: bar')
test_eq(_norm_quarto('#|not_quarto'), '#|not_quarto')
#|export
_cell_mgc = re.compile(r"^\s*%%\w+")
def first_code_ln(code_list, re_pattern=None, lang='python'):
"get first line number where code occurs, where `code_list` is a list of code"
if re_pattern is None: re_pattern = _dir_pre(lang)
return first(i for i,o in enumerate(code_list) if o.strip() != '' and not re.match(re_pattern, o) and not _cell_mgc.match(o))
_tst = """
#|default_exp
#|export
#|hide_input
foo
"""
test_eq(first_code_ln(_tst.splitlines(True)), 4)
#|hide
# test for cell magics
_tst = """%%timeit
#|hide
#|export
foo
"""
test_eq(first_code_ln(_tst.splitlines(True)), 3)
# test when there is line magic
_tst = """
#|hide
%line_magic
#|export
foo
"""
test_eq(first_code_ln(_tst.splitlines(True)),2)
#|export
def extract_directives(cell, remove=True, lang='python'):
"Take leading comment directives from lines of code in `ss`, remove `#|`, and split"
if cell.source:
ss = cell.source.splitlines(True)
first_code = first_code_ln(ss, lang=lang)
if not ss or first_code==0: return {}
pre = ss[:first_code]
if remove:
# Leave Quarto directives and cell magic in place for later processing
cell['source'] = ''.join([_norm_quarto(o, lang) for o in pre if _quarto_re(lang).match(o) or _cell_mgc.match(o)] + ss[first_code:])
return dict(L(_directive(s, lang) for s in pre).filter())
Comment directives start with #|
, followed by whitespace delimited tokens, which extract_directives
extracts from the start of a cell, up until a blank line or a line containing something other than comments. The extracted lines are removed from the source.
exp = AttrDict(source = """#|export module
#|eval:false
#| hide
# | foo bar
# |woo: baz
1+2
#bar""")
test_eq(extract_directives(exp), {'export':['module'], 'hide':[], 'eval:': ['false'], 'foo': ['bar'], 'woo:': ['baz']})
test_eq(exp.source, '#|eval: false\n# |woo: baz\n1+2\n#bar')
#|hide
exp = AttrDict(source = """
⍝|hide
⍝| foo: bar
# |woo: baz
1+2
⍝bar""")
test_eq(extract_directives(exp, lang='apl'), {'hide': [], 'foo:': ['bar']})
#|export
def opt_set(var, newval):
"newval if newval else var"
return newval if newval else var
#|export
def instantiate(x, **kwargs):
"Instantiate `x` if it's a type"
return x(**kwargs) if isinstance(x,type) else x
def _mk_procs(procs, nb): return L(procs).map(instantiate, nb=nb)
#|export
def _is_direc(f): return getattr(f, '__name__', '-')[-1]=='_'
#|export
class NBProcessor:
"Process cells and nbdev comments in a notebook"
def __init__(self, path=None, procs=None, nb=None, debug=False, rm_directives=True, process=False):
self.nb = read_nb(path) if nb is None else nb
self.lang = nb_lang(self.nb)
for cell in self.nb.cells: cell.directives_ = extract_directives(cell, remove=rm_directives, lang=self.lang)
self.procs = _mk_procs(procs, nb=self.nb)
self.debug,self.rm_directives = debug,rm_directives
if process: self.process()
def _process_cell(self, proc, cell):
if not hasattr(cell,'source'): return
if cell.cell_type=='code' and cell.directives_:
# Option 1: `proc` is directive name with `_` suffix
f = getattr(proc, '__name__', '-').rstrip('_')
if f in cell.directives_: self._process_comment(proc, cell, f)
# Option 2: `proc` contains a method named `_{directive}_`
for cmd in cell.directives_:
f = getattr(proc, f'_{cmd}_', None)
if f: self._process_comment(f, cell, cmd)
if callable(proc) and not _is_direc(proc): cell = opt_set(cell, proc(cell))
def _process_comment(self, proc, cell, cmd):
args = cell.directives_[cmd]
if self.debug: print(cmd, args, f)
return proc(cell, *args)
def _proc(self, proc):
if hasattr(proc,'begin'): proc.begin()
for cell in self.nb.cells: self._process_cell(proc, cell)
if hasattr(proc,'end'): proc.end()
self.nb.cells = [c for c in self.nb.cells if c and getattr(c,'source',None) is not None]
for i,cell in enumerate(self.nb.cells): cell.idx_ = i
def process(self):
"Process all cells with all processors"
for proc in self.procs: self._proc(proc)
Cell processors can be callables (e.g regular functions), in which case they are called for every cell (set a cell's source to None
to remove the cell):
everything_fn = '..//tests/01_everything.ipynb'
def print_execs(cell):
if 'exec' in cell.source: print(cell.source)
NBProcessor(everything_fn, print_execs).process()
--- title: Foo execute: echo: false --- exec("o_y=1") exec("p_y=1") _all_ = [o_y, 'p_y']
Comment directives are put in a cell attribute directive_
as a dictionary keyed by directive name:
def printme_func(cell):
if cell.directives_ and 'printme' in cell.directives_: print(cell.directives_['printme'])
NBProcessor(everything_fn, printme_func).process()
['testing']
However, a more convenient way to handle comment directives is to use a class as a processor, and include a method in your class with the same name as your directive, surrounded by underscores:
class _PrintExample:
def _printme_(self, cell, to_print): print(to_print)
NBProcessor(everything_fn, _PrintExample()).process()
testing
In the case that your processor supports just one comment directive, you can just use a regular function, with the same name as your directive, but with an underscore appended -- here printme_
is identical to _PrintExample
above:
def printme_(cell, to_print): print(to_print)
NBProcessor(everything_fn, printme_).process()
testing
NBProcessor(everything_fn, _PrintExample()).process()
testing
#|export
class Processor:
"Base class for processors"
def __init__(self, nb): self.nb = nb
def cell(self, cell): pass
def __call__(self, cell): return self.cell(cell)
For more complex behavior, inherit from Processor
, and override one of more of begin()
(called before any cells are processed), cell()
(called for each cell), and end()
(called after all cells are processed). You can also include comment directives (such as the _printme
example above) in these subclasses. Subclasses will automatically have access to self.nb
, containing the processed notebook.
class CountCellProcessor(Processor):
def begin(self):
print(f"First cell:\n{self.nb.cells[0].source}")
self.count=0
def cell(self, cell):
if cell.cell_type=='code': self.count += 1
def end(self): print(f"* There were {self.count} code cells")
NBProcessor(everything_fn, CountCellProcessor).process()
First cell: --- title: Foo execute: echo: false --- * There were 26 code cells
#|hide
from nbdev.maker import _basic_export_nb2
#|eval: false
#|hide
basic_export_nb2('01_read.ipynb', 'read')
basic_export_nb2('02_maker.ipynb', 'maker')
basic_export_nb2('03_process.ipynb', 'process')
g = exec_new('import nbdev.process')
assert hasattr(g['nbdev'].process, 'NBProcessor')