This demonstrates manually elaborated Pipeline support for native (unmodified) intbv
types. Drawbacks:
from myirl import *
from myhdl import intbv
# Create VHDL module context with stdout output:
d = DummyVHDLModule()
To enable latency accounting for any signal type, the following construct generates a Signal class derived from the given class argument that adds a _latency
member.
May be not implemented in this branch.
class PipelineTracker:
def __init__(self, *args, **kwargs):
self._latency = 0
super().__init__(*args, **kwargs)
def pipelined(sig):
base = sig
return type("Pipelined" + base.__name__, (PipelineTracker, base), {})
First, derive a PipelineProcess class from Process
:
class PipelineProcess(kernel.sensitivity.Process):
def __init__(self, func, clk, logic, stage):
def f():
return logic
self.func = f
f.__name__ = func.__name__ + "_stage%d" % stage
self.sensors = [clk]
self.edge = clk.POS
self.reset = None
self.logic = LogicContext()
self.logic += f()
Then implement a Pipeline class for the @pipeline
decorator magic:
from myirl.kernel import sensitivity
class Pipeline(Generator):
def __init__(self, func, clk, reset, enable, valid):
self.clk = clk
self.valid = valid
self.reset = reset
self.enable = enable
self.depth = 0
super().__init__(func)
def __call__(self, ctx):
n = 0
for i, g in enumerate(self.func()):
self.logic += [
PipelineProcess(self.func, self.clk, g, i)
]
signals = {}
for stmt in g:
stmt.get_drivers(signals)
for name, s in signals.items():
try:
s._latency = i + 1
except AttributeError:
raise TypeError("Signal %s must be of type PipelineSignal" % name)
n += 1
self.depth = n
n += 1
en = [ pipelined(Signal)(bool(), name = self.func.__name__ + "_enable%d" % i) for i in range(n) ]
@genprocess(self.clk, EDGE=self.clk.POS, RESET=self.reset)
def delay_queue():
for i in range(1, n):
yield [
en[i].set(en[i-1])
]
# Important to call that process within the
# actual context:
delay_queue(ctx)
self.logic += [
delay_queue,
self.valid.wireup(en[n-1]), # Ugly hack: abuse en0 for reset
en[0].wireup(self.enable)
]
def collect_sources(self):
signals = {}
for i in self.logic:
signals.update(i.collect_sources())
return signals
def collect_drivers(self):
signals = {}
for i in self.logic:
lsigs = i.collect_drivers()
signals.update(lsigs)
return signals
def pipeline(clk, reset, enable, valid, *kwargs):
def pipeline_dec(func):
return Pipeline(func, clk, reset, enable, valid)
return pipeline_dec
Now create a pipeline test:
from myirl.vector import VectorSig
@block
def dummy(clk: ClkSignal, reset : ResetSignal, en : Signal, din : Signal,
dout: Signal.Output, valid : Signal.Output):
PS = pipelined(Signal)
a = PS(intbv()[7:], name = 'a')
q = PS(intbv()[8:], name = 'q')
p = PS(intbv()[5:], name = 'p')
@pipeline(clk, reset, en, valid)
def pipe():
# u = Variable('u', intbv()[22:])
v = Variable('v', intbv(5)[7:])
# First stage
yield [
v.assign(din * base.ConstSig(2, 2)),
a.set(v),
]
# Second stage
yield [
q.set(a + 4)
]
# Third stage
yield [ p.set(q[6:1]) ] # value >> 1
wires = [ dout.wireup(p) ]
return locals()
from myirl.test.common_test import *
from myirl.simulation import *
from myirl.test.ghdl import GHDL
from yosys.simulator import CXXRTL
from simulation import sim
@sim.testbench(GHDL)
@block
def tb():
clk = ClkSignal(name = 'clk')
en, valid = [ Signal(bool()) for _ in range(2) ]
data_in = Signal(intbv()[5:])
data_out = Signal(intbv()[5:])
reset = ResetSignal(ResetSignal.POS_ASYNC)
inst = dummy(clk, reset, en, data_in, data_out, valid)
osc = gen_osc(clk, CYCLE=5)
@generator
def seq():
yield [
reset.set(True), en.set(False),
wait(2 * [clk.posedge]), reset.set(False),
wait(4 * [clk.posedge])
]
it = Iterator([0xa, 0x5, 0x2])
yield [
For(it)(
data_in.set(it),
wait('1 ns'),
en.set(True),
wait(clk.posedge),
)
]
yield [ wait(4 * [clk.posedge] ) ]
yield [
print_("data out:", data_out),
]
yield [
wait(4 * [clk.posedge])
]
yield [ raise_(StopSimulation) ]
return locals()
inst = tb()
inst.run(180, wavetrace= "/tmp/" + tb.name + ".vcd")
FALLBACK: UNHANDLED ROOT CLASS <class 'myirl.test.ghdl.GHDLTestbench'>, create new context Writing 'dummy' to file /tmp/dummy.vhdl Writing 'tb' to file /tmp/tb.vhdl Creating library file /tmp/module_defs.vhdl ==== COSIM stdout ==== analyze /home/testing/.local/lib/python3.9/site-packages/myirl-0.0.0-py3.9-linux-x86_64.egg/myirl/targets/../test/vhdl/txt_util.vhdl analyze /home/testing/.local/lib/python3.9/site-packages/myirl-0.0.0-py3.9-linux-x86_64.egg/myirl/targets/libmyirl.vhdl analyze /tmp/dummy.vhdl analyze /tmp/tb.vhdl elaborate tb ==== COSIM stdout ==== data out: 0x04 simulation stopped @165ns
0
import wavedraw; import nbwavedrom
TB = tb.uut_name;
cfg = wavedraw.config(TB,
{
'clk' : None, 'en' : None, 'reset' : None,
'valid' : None, 'data_out[4:0]' : None, 'data_in[4:0]' : None,
'inst_dummy_0.q[7:0]' : None, 'inst_dummy_0.a[6:0]' : None,
}
)
waveform = wavedraw.vcd2wave("/tmp/tb.vcd", TB + '.clk', None)
nbwavedrom.draw(waveform)
# !cat -n {f[0]}