Pipeline generators

This demonstrates manually elaborated Pipeline support for native (unmodified) intbv types. Drawbacks:

  • No automated bit width
  • No automated inference of ALU types
  • Limited pipeline delay tracking for signals
In [1]:
from myirl import *
from myhdl import intbv
# Create VHDL module context with stdout output:
d = DummyVHDLModule()

Delay tracking

To enable latency accounting for any signal type, the following construct generates a Signal class derived from the given class argument that adds a _latency member. May be not implemented in this branch.

In [2]:
class PipelineTracker:
    def __init__(self, *args, **kwargs):
        self._latency = 0
        super().__init__(*args, **kwargs)

def pipelined(sig):
    base = sig

    return type("Pipelined" + base.__name__, (PipelineTracker, base),  {})

A Pipeline stage process (element)

First, derive a PipelineProcess class from Process:

In [3]:
class PipelineProcess(kernel.sensitivity.Process):
    def __init__(self, func, clk, logic, stage):
        def f():
            return logic
    
        self.func = f
        f.__name__ = func.__name__ + "_stage%d" % stage
        self.sensors = [clk]
        self.edge = clk.POS
        self.reset = None
        self.logic = LogicContext()
        self.logic += f()

Then implement a Pipeline class for the @pipeline decorator magic:

In [4]:
from myirl.kernel import sensitivity

class Pipeline(Generator):
    def __init__(self, func, clk, reset, enable, valid):
        self.clk = clk
        self.valid = valid
        self.reset = reset
        self.enable = enable
        self.depth = 0
        super().__init__(func)

    def __call__(self, ctx):
        n = 0
        for i, g in enumerate(self.func()):
            self.logic += [
                PipelineProcess(self.func, self.clk, g, i)
            ]
            signals = {}
            for stmt in g:
                stmt.get_drivers(signals)
                for name, s in signals.items():
                    try:
                        s._latency = i + 1
                    except AttributeError:
                        raise TypeError("Signal %s must be of type PipelineSignal" % name)
            n += 1
        self.depth = n
        
        n += 1
        en = [ pipelined(Signal)(bool(), name = self.func.__name__ + "_enable%d" % i) for i in range(n) ]
                
        @genprocess(self.clk, EDGE=self.clk.POS, RESET=self.reset)
        def delay_queue():
            for i in range(1, n):
                yield [
                    en[i].set(en[i-1])            
                ]

        # Important to call that process within the
        # actual context:
        delay_queue(ctx)

        self.logic += [
            delay_queue,
            self.valid.wireup(en[n-1]), # Ugly hack: abuse en0 for reset
            en[0].wireup(self.enable)
        ]
            
    def collect_sources(self):
        signals = {}
        for i in self.logic:
            signals.update(i.collect_sources())
        
        return signals

    def collect_drivers(self):
        signals = {}
        for i in self.logic:
            lsigs = i.collect_drivers()
            signals.update(lsigs)
        
        return signals    
    
def pipeline(clk, reset, enable, valid, *kwargs):
    def pipeline_dec(func):
        return Pipeline(func, clk, reset, enable, valid)

    return pipeline_dec   

Now create a pipeline test:

In [5]:
from myirl.vector import VectorSig

@block
def dummy(clk: ClkSignal, reset : ResetSignal, en : Signal, din : Signal,
          dout: Signal.Output, valid : Signal.Output):
    
    PS = pipelined(Signal)
    
    a = PS(intbv()[7:], name = 'a')
 
    q = PS(intbv()[8:], name = 'q')
    p = PS(intbv()[5:], name = 'p')

    @pipeline(clk, reset, en, valid)
    def pipe():
        # u = Variable('u', intbv()[22:])
        v = Variable('v', intbv(5)[7:])
        # First stage
        yield [
            v.assign(din * base.ConstSig(2, 2)),
            a.set(v),
        ]
        
        # Second stage
        yield [
            q.set(a + 4)
        ]
        
        # Third stage
        yield [ p.set(q[6:1]) ] # value >> 1
        
    wires = [ dout.wireup(p) ]
        
    return locals()
In [6]:
from myirl.test.common_test import *
from myirl.simulation import *
from myirl.test.ghdl import GHDL
from yosys.simulator import CXXRTL

from simulation import sim

@sim.testbench(GHDL)
@block
def tb():
    clk = ClkSignal(name = 'clk')
    en, valid = [ Signal(bool()) for _ in range(2) ]
    data_in = Signal(intbv()[5:])
    data_out = Signal(intbv()[5:])
    reset = ResetSignal(ResetSignal.POS_ASYNC)

    inst = dummy(clk, reset, en, data_in, data_out, valid)
 
    osc = gen_osc(clk, CYCLE=5)
    
    @generator
    def seq():
        
        yield [
            reset.set(True), en.set(False),
            wait(2 * [clk.posedge]), reset.set(False),
            wait(4 * [clk.posedge])
        ]
        
        it = Iterator([0xa, 0x5, 0x2])
            
        yield [
            For(it)(
                data_in.set(it),
                wait('1 ns'),
                en.set(True),

                wait(clk.posedge),
            )
        ]
         
        yield [ wait(4 * [clk.posedge] ) ]
        
        yield [
            print_("data out:", data_out),
        ]
        yield [
            wait(4 * [clk.posedge])
        ]
        yield [ raise_(StopSimulation) ]
        
    return locals()
 
inst = tb()
inst.run(180, wavetrace= "/tmp/" + tb.name + ".vcd")
Creating sequential 'tb/seq' 
 Elaborating component dummy_s1_s1_s1_s5_s5_s1 
 Writing 'dummy' to file /tmp/myirl_top_tb_ss_285ld/dummy.vhdl 
 Elaborating component tb 
 Writing 'tb' to file /tmp/myirl_top_tb_ss_285ld/tb.vhdl 
 Creating library file /tmp/myirl_module_defs_tbwtsr54/module_defs.vhdl 
==== COSIM stdout ====

==== COSIM stderr ====

==== COSIM stdout ====
analyze /home/testing/.local/lib/python3.9/site-packages/myirl-0.0.0-py3.9-linux-x86_64.egg/myirl/targets/../test/vhdl/txt_util.vhdl
analyze /home/testing/.local/lib/python3.9/site-packages/myirl-0.0.0-py3.9-linux-x86_64.egg/myirl/targets/libmyirl.vhdl
analyze /tmp/myirl_top_tb_ss_285ld/dummy.vhdl
analyze /tmp/myirl_top_tb_ss_285ld/tb.vhdl
elaborate tb

==== COSIM stderr ====

==== COSIM stdout ====
data out: 0x04
/tmp/myirl_top_tb_ss_285ld/tb.vhdl:82:9:@165ns:(assertion failure): Stop Simulation
/tmp/tb:error: assertion failed
in process .tb(myirl).seq
/tmp/tb:error: simulation failed

==== COSIM stderr ====

Out[6]:
0
In [7]:
import wavedraw; import nbwavedrom
TB = tb.uut_name;

cfg = wavedraw.config(TB, 
      {
    'clk' : None, 'en' : None, 'reset' : None,
    'valid' : None, 'data_out[4:0]' : None, 'data_in[4:0]' : None, 
    'inst_dummy_0.q[7:0]' : None, 'inst_dummy_0.a[6:0]' : None,
    }
)

waveform = wavedraw.vcd2wave("/tmp/tb.vcd", TB + '.clk', None)
nbwavedrom.draw(waveform)
In [8]:
# !cat -n {f[0]}