This demonstrates manually elaborated Pipeline support for native (unmodified) intbv
types. Drawbacks:
from myirl import *
from myhdl import intbv
# Create VHDL module context with stdout output:
d = DummyVHDLModule()
To enable latency accounting for any signal type, the following construct generates a Signal class derived from the given class argument that adds a _latency
member.
Possibly ineffective for the time being
class PipelineTracker:
def __init__(self, *args, **kwargs):
self._latency = 0
super().__init__(*args, **kwargs)
def pipelined(sig):
base = sig
return type("Pipelined" + base.__name__, (PipelineTracker, base), {})
First, derive a PipelineProcess class from Process
:
class PipelineProcess(kernel.sensitivity.Process):
def __init__(self, func, clk, logic, stage):
def f():
return logic
self.func = f
f.__name__ = func.__name__ + "_stage%d" % stage
self.sensors = [clk]
self.edge = clk.POS
self.reset = None
self.logic = LogicContext()
self.logic += f()
Then implement a Pipeline class for the @pipeline
decorator magic:
from myirl.kernel import sensitivity
class Pipeline(Generator):
def __init__(self, func, clk, reset, enable, valid):
self.clk = clk
self.valid = valid
self.reset = reset
self.enable = enable
self.depth = 0
super().__init__(func)
def __call__(self, ctx):
n = 0
for i, g in enumerate(self.func()):
self.logic += [
PipelineProcess(self.func, self.clk, g, i)
]
signals = {}
for stmt in g:
stmt.get_drivers(signals)
for name, s in signals.items():
try:
s._latency = i + 1
except AttributeError:
raise TypeError("Signal %s must be of type PipelineSignal" % name)
n += 1
self.depth = n
n += 1
en = [ pipelined(Signal)(bool(), name = self.func.__name__ + "_enable%d" % i) for i in range(n) ]
@genprocess(self.clk, EDGE=self.clk.POS, RESET=self.reset)
def delay_queue():
for i in range(1, n):
yield [
en[i].set(en[i-1])
]
# Important to call that process within the
# actual context:
delay_queue(ctx)
self.logic += [
delay_queue,
self.valid.wireup(en[n-1]), # Ugly hack: abuse en0 for reset
en[0].wireup(self.enable)
]
def collect_sources(self):
signals = {}
for i in self.logic:
signals.update(i.collect_sources())
return signals
def collect_drivers(self):
signals = {}
for i in self.logic:
lsigs = i.collect_drivers()
signals.update(lsigs)
return signals
def pipeline(clk, reset, enable, valid, *kwargs):
def pipeline_dec(func):
return Pipeline(func, clk, reset, enable, valid)
return pipeline_dec
Now create a pipeline test:
from myirl.vector import VectorSig
@block
def dummy(clk: ClkSignal, reset : ResetSignal, en : Signal, din : Signal,
dout: Signal.Output, valid : Signal.Output):
PS = pipelined(Signal)
a = PS(intbv()[7:], name = 'a')
q = PS(intbv()[8:], name = 'q')
p = PS(intbv()[5:], name = 'p')
@pipeline(clk, reset, en, valid)
def pipe():
# u = Variable('u', intbv()[22:])
v = Variable('v', intbv(5)[7:])
# First stage
yield [
v.assign(din * base.ConstSig(2, 2)),
a.set(v),
]
# Second stage
yield [
q.set(a + 4)
]
# Third stage
yield [ p.set(q[6:1]) ] # value >> 1
wires = [ dout.wireup(p) ]
return locals()
from myirl.test.common_test import *
from myirl.simulation import *
@block
def tb():
clk = ClkSignal(name = 'clk')
en, valid = [ Signal(bool()) for _ in range(2) ]
data_in = Signal(intbv()[5:])
data_out = Signal(intbv()[5:])
reset = ResetSignal(ResetSignal.POS_ASYNC)
inst = dummy(clk, reset, en, data_in, data_out, valid)
osc = gen_osc(clk, CYCLE=5)
@generator
def seq():
yield [
reset.set(True), en.set(False),
wait(2 * [clk.posedge]), reset.set(False),
wait(4 * [clk.posedge])
]
it = Iterator([0xa, 0x5, 0x2])
yield [
For(it)(
data_in.set(it),
wait('1 ns'),
en.set(True),
wait(clk.posedge),
)
]
yield [ wait(4 * [clk.posedge] ) ]
yield [
print_("data out:", data_out),
]
yield [
wait(4 * [clk.posedge])
]
yield [ raise_(StopSimulation) ]
return locals()
def test():
inst = tb()
files = inst.elab(targets.VHDL, elab_all = True)
run_ghdl(files, inst, debug = True, vcdfile='/tmp/tb_pipe.vcd')
return files
f = test()
Creating sequential 'tb/seq' Elaborating component dummy_s1_s1_s1_s5_s5_s1 Writing 'dummy' to file /tmp/myirl_top_tb_xi8eget9/dummy.vhdl Elaborating component tb Writing 'tb' to file /tmp/myirl_top_tb_xi8eget9/tb.vhdl Creating library file /tmp/myirl_module_defs_7qwupqxl/module_defs.vhdl ==== COSIM stdout ==== ==== COSIM stderr ==== ==== COSIM stdout ==== analyze /home/testing/.local/lib/python3.9/site-packages/myirl-0.0.0-py3.9-linux-x86_64.egg/myirl/targets/../test/vhdl/txt_util.vhdl analyze /home/testing/.local/lib/python3.9/site-packages/myirl-0.0.0-py3.9-linux-x86_64.egg/myirl/targets/libmyirl.vhdl analyze /tmp/myirl_top_tb_xi8eget9/dummy.vhdl analyze /tmp/myirl_top_tb_xi8eget9/tb.vhdl elaborate tb ==== COSIM stderr ==== ==== COSIM stdout ==== data out: 0x04 /tmp/myirl_top_tb_xi8eget9/tb.vhdl:82:9:@165ns:(assertion failure): Stop Simulation /tmp/tb:error: assertion failed in process .tb(myirl).seq /tmp/tb:error: simulation failed ==== COSIM stderr ====
import wavedraw; import nbwavedrom
TB = tb.name;
cfg = {
'tb.clk' : None, 'tb.en' : None, 'tb.reset' : None,
'tb.valid' : None, 'tb.data_out[4:0]' : None, 'tb.data_in[4:0]' : None,
'tb.inst_dummy_0.q[7:0]' : None, 'tb.inst_dummy_0.a[6:0]' : None,
}
waveform = wavedraw.vcd2wave("/tmp/tb_pipe.vcd", TB + '.clk', cfg)
nbwavedrom.draw(waveform)
tb.clk tb.en tb.reset tb.valid tb.data_out[4:0] tb.data_in[4:0] tb.inst_dummy_0.q[7:0] tb.inst_dummy_0.a[6:0]
!cat -n {f[0]}
1 -- File generated from source: 2 -- /tmp/ipykernel_41059/1097782439.py 3 -- (c) 2016-2021 section5.ch 4 -- Modifications may be lost, edit the source file instead. 5 6 library IEEE; 7 use IEEE.std_logic_1164.all; 8 use IEEE.numeric_std.all; 9 10 library work; 11 12 use work.txt_util.all; 13 use work.myirl_conversion.all; 14 15 entity dummy is 16 port ( 17 clk : in std_ulogic; 18 reset : in std_ulogic; 19 en : in std_ulogic; 20 din : in unsigned(4 downto 0); 21 dout : out unsigned(4 downto 0); 22 valid : out std_ulogic 23 ); 24 end entity dummy; 25 26 architecture MyIRL of dummy is 27 -- Local type declarations 28 -- Signal declarations 29 signal a : unsigned(6 downto 0); 30 signal q : unsigned(7 downto 0); 31 signal p : unsigned(4 downto 0); 32 signal pipe_enable1 : std_ulogic; 33 signal pipe_enable2 : std_ulogic; 34 signal pipe_enable3 : std_ulogic; 35 signal pipe_enable0 : std_ulogic; 36 begin 37 38 pipe_stage0: 39 process(clk) 40 variable v : unsigned(6 downto 0); 41 begin 42 if rising_edge(clk) then 43 v := (din * "10"); 44 a <= v; 45 end if; 46 end process; 47 48 pipe_stage1: 49 process(clk) 50 begin 51 if rising_edge(clk) then 52 q <= (resize((a), 8) + x"04"); 53 end if; 54 end process; 55 56 pipe_stage2: 57 process(clk) 58 begin 59 if rising_edge(clk) then 60 p <= q(6-1 downto 1); 61 end if; 62 end process; 63 64 delay_queue: 65 process(clk, reset) 66 begin 67 if reset = '1' then 68 pipe_enable1 <= '0'; 69 pipe_enable2 <= '0'; 70 pipe_enable3 <= '0'; 71 elsif rising_edge(clk) then 72 pipe_enable1 <= pipe_enable0; 73 pipe_enable2 <= pipe_enable1; 74 pipe_enable3 <= pipe_enable2; 75 end if; 76 end process; 77 valid <= pipe_enable3; 78 pipe_enable0 <= en; 79 dout <= p; 80 end architecture MyIRL; 81