#!/usr/bin/env python # coding: utf-8 # In[1]: from collections import defaultdict, deque class Jump(Exception): def __init__(self, offset): self.offset = offset super().__init__(offset) def opcode(operands): def decorator(f): class Opcode: def __set_name__(self, owner, name): self.opcode = name[3:] owner.opcodes[self.opcode] = self def __repr__(self): return f"" def value(self, operand, type_): if type_ == "r": return operand try: return int(operand) except ValueError: return self.registers[operand] def __call__(self, cpu, *ops): self.registers = cpu.registers try: result = f(cpu, *map(self.value, ops, operands)) cpu.pos += 1 except Jump as j: cpu.pos += j.offset result = None return self.opcode, result return Opcode() return decorator class Proc: opcodes = {} def __init__(self): self.reset() def reset(self): self.registers = defaultdict(int) self.sound_freq = 0 self.pos = 0 def run(self, instructions): while True: opcode, *ops = instructions[self.pos] yield self.opcodes[opcode](self, *ops) def run_until_rcv(self, instructions): return next( val for op, val in self.run(instructions) if op == "rcv" and val is not None ) @opcode("v") def op_snd(self, x): self.sound_freq = x @opcode("rv") def op_set(self, x, y): self.registers[x] = y @opcode("rv") def op_add(self, x, y): self.registers[x] += y @opcode("rv") def op_mul(self, x, y): self.registers[x] *= y @opcode("rv") def op_mod(self, x, y): self.registers[x] %= y @opcode("r") def op_rcv(self, x): if self.registers[x]: return self.sound_freq @opcode("vv") def op_jgz(self, x, y): if x > 0: raise Jump(y) class SendingProc(Proc): opcodes = Proc.opcodes.copy() def __init__(self, cpu_id): self.cpu_id = cpu_id super().__init__() def set_pair(self, cpu): self.paired = cpu def reset(self): super().reset() self.message_queue = deque() self.registers["p"] = self.cpu_id @opcode("v") def op_snd(self, x): self.paired.message_queue.append(x) @opcode("r") def op_rcv(self, x): if not self.message_queue: raise Jump(0) value = self.message_queue.popleft() self.registers[x] = value return value def parallel_run(instructions): cpu0 = SendingProc(0) cpu1 = SendingProc(1) cpu0.set_pair(cpu1) cpu1.set_pair(cpu0) sendcount = 0 for (op0, res0), (op1, res1) in zip(cpu0.run(instructions), cpu1.run(instructions)): if op1 == "snd": sendcount += 1 if (op0, res0, op1, res1) == ("rcv", None, "rcv", None): # deadlock return sendcount # In[2]: proc = Proc() test_instr = [ instr.split() for instr in """\ set a 1 add a 2 mul a a mod a 5 snd a set a 0 rcv a jgz a -1 set a 1 jgz a -2 """.splitlines() ] assert proc.run_until_rcv(test_instr) == 4 # In[3]: test_instr = [ instr.split() for instr in """\ snd 1 snd 2 snd p rcv a rcv b rcv c rcv d """.splitlines() ] assert parallel_run(test_instr) == 3 # In[4]: import aocd data = aocd.get_data(day=18, year=2017) instructions = [line.split() for line in data.splitlines()] # In[5]: proc = Proc() print("Part 1:", proc.run_until_rcv(instructions)) # In[6]: print("Part 2:", parallel_run(instructions))