#!/usr/bin/env python # coding: utf-8 # # Pipeline generators # # This demonstrates manually elaborated Pipeline support for native (unmodified) `intbv` types. Drawbacks: # * No automated bit width # * No automated inference of ALU types # * Limited pipeline delay tracking for signals # In[1]: from myirl import * from myhdl import intbv # Create VHDL module context with stdout output: d = DummyVHDLModule() # ## Delay tracking # # To enable latency accounting for any signal type, the following construct generates a Signal class derived from the given class argument that adds a `_latency` member. # **Possibly ineffective for the time being** # In[2]: class PipelineTracker: def __init__(self, *args, **kwargs): self._latency = 0 super().__init__(*args, **kwargs) def pipelined(sig): base = sig return type("Pipelined" + base.__name__, (PipelineTracker, base), {}) # ## A Pipeline stage process (element) # First, derive a PipelineProcess class from `Process`: # In[3]: class PipelineProcess(kernel.sensitivity.Process): def __init__(self, func, clk, logic, stage): def f(): return logic self.func = f f.__name__ = func.__name__ + "_stage%d" % stage self.sensors = [clk] self.edge = clk.POS self.reset = None self.logic = LogicContext() self.logic += f() # Then implement a Pipeline class for the `@pipeline` decorator magic: # In[4]: from myirl.kernel import sensitivity class Pipeline(Generator): def __init__(self, func, clk, reset, enable, valid): self.clk = clk self.valid = valid self.reset = reset self.enable = enable self.depth = 0 super().__init__(func) def __call__(self, ctx): n = 0 for i, g in enumerate(self.func()): self.logic += [ PipelineProcess(self.func, self.clk, g, i) ] signals = {} for stmt in g: stmt.get_drivers(signals) for name, s in signals.items(): try: s._latency = i + 1 except AttributeError: raise TypeError("Signal %s must be of type PipelineSignal" % name) n += 1 self.depth = n n += 1 en = [ pipelined(Signal)(bool(), name = self.func.__name__ + "_enable%d" % i) for i in range(n) ] @genprocess(self.clk, EDGE=self.clk.POS, RESET=self.reset) def delay_queue(): for i in range(1, n): yield [ en[i].set(en[i-1]) ] # Important to call that process within the # actual context: delay_queue(ctx) self.logic += [ delay_queue, self.valid.wireup(en[n-1]), # Ugly hack: abuse en0 for reset en[0].wireup(self.enable) ] def collect_sources(self): signals = {} for i in self.logic: signals.update(i.collect_sources()) return signals def collect_drivers(self): signals = {} for i in self.logic: lsigs = i.collect_drivers() signals.update(lsigs) return signals def pipeline(clk, reset, enable, valid, *kwargs): def pipeline_dec(func): return Pipeline(func, clk, reset, enable, valid) return pipeline_dec # Now create a pipeline test: # In[5]: from myirl.vector import VectorSig @block def dummy(clk: ClkSignal, reset : ResetSignal, en : Signal, din : Signal, dout: Signal.Output, valid : Signal.Output): PS = pipelined(Signal) a = PS(intbv()[7:], name = 'a') q = PS(intbv()[8:], name = 'q') p = PS(intbv()[5:], name = 'p') @pipeline(clk, reset, en, valid) def pipe(): # u = Variable('u', intbv()[22:]) v = Variable('v', intbv(5)[7:]) # First stage yield [ v.assign(din * base.ConstSig(2, 2)), a.set(v), ] # Second stage yield [ q.set(a + 4) ] # Third stage yield [ p.set(q[6:1]) ] # value >> 1 wires = [ dout.wireup(p) ] return locals() from myirl.test.common_test import * from myirl.simulation import * @block def tb(): clk = ClkSignal(name = 'clk') en, valid = [ Signal(bool()) for _ in range(2) ] data_in = Signal(intbv()[5:]) data_out = Signal(intbv()[5:]) reset = ResetSignal(ResetSignal.POS_ASYNC) inst = dummy(clk, reset, en, data_in, data_out, valid) osc = gen_osc(clk, CYCLE=5) @generator def seq(): yield [ reset.set(True), en.set(False), wait(2 * [clk.posedge]), reset.set(False), wait(4 * [clk.posedge]) ] it = Iterator([0xa, 0x5, 0x2]) yield [ For(it)( data_in.set(it), wait('1 ns'), en.set(True), wait(clk.posedge), ) ] yield [ wait(4 * [clk.posedge] ) ] yield [ print_("data out:", data_out), ] yield [ wait(4 * [clk.posedge]) ] yield [ raise_(StopSimulation) ] return locals() def test(): inst = tb() files = inst.elab(targets.VHDL, elab_all = True) run_ghdl(files, inst, debug = True, vcdfile='/tmp/tb_pipe.vcd') return files f = test() # In[6]: import wavedraw; import nbwavedrom TB = tb.name; cfg = { 'tb.clk' : None, 'tb.en' : None, 'tb.reset' : None, 'tb.valid' : None, 'tb.data_out[4:0]' : None, 'tb.data_in[4:0]' : None, 'tb.inst_dummy_0.q[7:0]' : None, 'tb.inst_dummy_0.a[6:0]' : None, } waveform = wavedraw.vcd2wave("/tmp/tb_pipe.vcd", TB + '.clk', cfg) nbwavedrom.draw(waveform) # In[7]: get_ipython().system('cat -n {f[0]}')