from collections import defaultdict, deque
class Jump(Exception):
def __init__(self, offset):
self.offset = offset
super().__init__(offset)
def opcode(operands):
def decorator(f):
class Opcode:
def __set_name__(self, owner, name):
self.opcode = name[3:]
owner.opcodes[self.opcode] = self
def __repr__(self):
return f"<opcode {self.opcode} {operands!r}>"
def value(self, operand, type_):
if type_ == "r":
return operand
try:
return int(operand)
except ValueError:
return self.registers[operand]
def __call__(self, cpu, *ops):
self.registers = cpu.registers
try:
result = f(cpu, *map(self.value, ops, operands))
cpu.pos += 1
except Jump as j:
cpu.pos += j.offset
result = None
return self.opcode, result
return Opcode()
return decorator
class Proc:
opcodes = {}
def __init__(self):
self.reset()
def reset(self):
self.registers = defaultdict(int)
self.sound_freq = 0
self.pos = 0
def run(self, instructions):
while True:
opcode, *ops = instructions[self.pos]
yield self.opcodes[opcode](self, *ops)
def run_until_rcv(self, instructions):
return next(
val for op, val in self.run(instructions) if op == "rcv" and val is not None
)
@opcode("v")
def op_snd(self, x):
self.sound_freq = x
@opcode("rv")
def op_set(self, x, y):
self.registers[x] = y
@opcode("rv")
def op_add(self, x, y):
self.registers[x] += y
@opcode("rv")
def op_mul(self, x, y):
self.registers[x] *= y
@opcode("rv")
def op_mod(self, x, y):
self.registers[x] %= y
@opcode("r")
def op_rcv(self, x):
if self.registers[x]:
return self.sound_freq
@opcode("vv")
def op_jgz(self, x, y):
if x > 0:
raise Jump(y)
class SendingProc(Proc):
opcodes = Proc.opcodes.copy()
def __init__(self, cpu_id):
self.cpu_id = cpu_id
super().__init__()
def set_pair(self, cpu):
self.paired = cpu
def reset(self):
super().reset()
self.message_queue = deque()
self.registers["p"] = self.cpu_id
@opcode("v")
def op_snd(self, x):
self.paired.message_queue.append(x)
@opcode("r")
def op_rcv(self, x):
if not self.message_queue:
raise Jump(0)
value = self.message_queue.popleft()
self.registers[x] = value
return value
def parallel_run(instructions):
cpu0 = SendingProc(0)
cpu1 = SendingProc(1)
cpu0.set_pair(cpu1)
cpu1.set_pair(cpu0)
sendcount = 0
for (op0, res0), (op1, res1) in zip(cpu0.run(instructions), cpu1.run(instructions)):
if op1 == "snd":
sendcount += 1
if (op0, res0, op1, res1) == ("rcv", None, "rcv", None):
# deadlock
return sendcount