#!/usr/bin/env python # coding: utf-8 # # Observer Pattern # 1. Behavioral # 1. Also known as **Publish Subscribe Pattern, Dependence Pattern** # 1. Used to control operation of Objects # 1. Used for Event Monitoring # 1. One to Many relationship builder, when one object's state changes, all dependents are notified # # > **Example** # > - Newspaper Subscription # > - Channel Subscription # > - Any kind of push notification service # > - Mostly used in GUIs # # **Advantages** # 1. Separation of Concern (Single Responsibility) # 1. Interface Segregation # 1. Open-Closed # 1. Dependency Inversion # 1. Encapsulate what varies # # > **MVC** # > - Model View controller # > - Model - Subject/Publisher # > - View - Observer # # Dashboard for Tech Support # - KPI - Key Performance Indicators # - Open Tickets # - New Tickets in Last Hour # - Closed Tickets in Last Hour # - Observer - Dashboard, Perhaps History Viewer, Or Forecaster # - Publisher(Subject) - KPI Source # In[1]: from abc import ABCMeta, abstractmethod # ### Interfaces # 1. Context Managed - Lifecycle Method introduced, so that they clean up themselves and avoid Dangling References # In[2]: class IObserver(metaclass = ABCMeta): @abstractmethod def update(self, value): pass def __enter__(self): return self @abstractmethod def __exit__(self, exc_type, exc_value, traceback): pass # In[3]: class IPublisher(metaclass = ABCMeta): _observers = set() def attach(self, observer): if not isinstance(observer, IObserver): raise TypeError('Observer not derived from IObserver') self._observers |= {observer} def detach(self, observer): self._observers -= {observer} def notify(self, msg=None): for observer in self._observers: if msg is None: observer.update() else: observer.update(msg) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._observers.clear() # ### Implementation # In[4]: class KPIs(IPublisher): _open_tickets = -1 _closed_tickets = -1 _new_tickets = -1 @property def open_tickets(self): return self._open_tickets @property def closed_tickets(self): return self._closed_tickets @property def new_tickets(self): return self._new_tickets def set_kpis(self, open_tickets, closed_tickets, new_tickets, msg=None): self._open_tickets = open_tickets self._closed_tickets = closed_tickets self._new_tickets = new_tickets self.notify(msg) # In[5]: class CurrentKPIs(IObserver): open_tickets = -1 closed_tickets = -1 new_tickets = -1 def __init__(self, kpis): self._kpis = kpis kpis.attach(self) def update(self, msg=None): self.open_tickets = self._kpis.open_tickets self.closed_tickets = self._kpis.closed_tickets self.new_tickets = self._kpis.new_tickets self.display(msg) def display(self, msg=None): print(f'Current KPIs ({msg}):\nOpen: {self.open_tickets}' f'\nClosed: {self.closed_tickets}\nNew: {self.new_tickets}\n') def __exit__(self, exc_type, exc_value, traceback): self._kpis.detach(self) # In[6]: class ForecastKPIs(IObserver): open_tickets = -1 closed_tickets = -1 new_tickets = -1 def __init__(self, kpis): self._kpis = kpis kpis.attach(self) def update(self, msg=None): self.open_tickets = self._kpis.open_tickets self.closed_tickets = self._kpis.closed_tickets self.new_tickets = self._kpis.new_tickets self.display(msg) def display(self, msg): print(f'Forecast KPIs ({msg}):\nOpen: {self.open_tickets}' f'\nClosed: {self.closed_tickets}\nNew: {self.new_tickets}\n') def __exit__(self, exc_type, exc_value, traceback): self._kpis.detach(self) # ### Driver Program # In[7]: with KPIs() as kpis: with CurrentKPIs(kpis) as currKPIs, ForecastKPIs(kpis): kpis.set_kpis(10, 20, 30, 'Good Performance') kpis.detach(currKPIs) print("=========================\nAfter Detaching\n=========================\n") kpis.set_kpis(1, 2, 3, 'Critical Performance') # ### After Context Manager Exit # 1. No one is notified # 1. All the references has been removed from memory # In[8]: kpis.set_kpis(100, 120, 160) # No more notifications fired # # Variation # In[9]: class IObservable(metaclass = ABCMeta): _observers = set() def subscribe(self, observer): self._observers |= {observer} def unsubscribe(self, observer): self._observers -= {observer} def emit(self, val): for observer in self._observers: observer(val) def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self._observers.clear() # In[10]: class NewKPIs(IObservable): _open_tickets = -1 _closed_tickets = -1 _new_tickets = -1 def set_kpis(self, open_tickets, closed_tickets, new_tickets): self._open_tickets = open_tickets self._closed_tickets = closed_tickets self._new_tickets = new_tickets self.emit((self._open_tickets, self._closed_tickets, self._new_tickets)) # In[11]: def currKPI(val): x, y, z = val print(f'Current KPIs:\nOpen: {x}\nClosed: {y}\nNew: {z}\n') def foreKPI(val): x, y, z = val print(f'Forecast KPIs:\nOpen: {x}\nClosed: {y}\nNew: {z}\n') # In[12]: with NewKPIs() as newKPIs: newKPIs.subscribe(currKPI) newKPIs.subscribe(foreKPI) newKPIs.set_kpis(1, 2, 3) newKPIs.unsubscribe(currKPI) print("=========================\nAfter Detaching\n=========================\n") newKPIs.set_kpis(10, 20, 30)