#!/usr/bin/env python # coding: utf-8 # # Interactive Particle Swarm Optimisation Dashboard from Scratch in Python # > Swarm Intelligence from social interaction # # - toc: false # - badges: true # - comments: true # - image: images/copied_from_nb/my_icons/static_pso.png # - categories: [jupyter, optimisation, visualisation] # # Why are we here? # I'm hoping it's to read about **Swarm Intelligence!** I'm also hoping you're interested to read about the **_interactive dashboard_** side of things too so we can play with it at the end. # # If that sounds like too much and you just want to play with it now, **you can use it on PyViz examples here: https://particle-swarms.pyviz.demo.anaconda.com** # # We're going to build the dashboard using some of [Anacondas](https://www.anaconda.com/)'s [HoloViz](https://holoviz.org/) tools (Holoviews, Panel and Bokeh) to **get the result from the tweet below.** # > twitter: https://twitter.com/_ScottCondron/status/1293605204013846530 # ## Finding the "just right" Goldilocks Zone using Swarm Intelligence # Say you're building a house and you want to maximise the number of rooms you can fit in your plot of land, maybe saying that all rooms have to be a certain size or bigger. That's the kind of thing that optimisation algorithms are useful for. # # Optimisation methods like **Particle Swarm Optimisation** are used when you want to find the _best_/_optimum_ for some system / problem. **You could just try every possible input** but that might take a while so smarter people than me have invented better ways. # ## No death # # This is going to be pretty similar to my [Genetic Algorithm blog post](https://www.scottcondron.com/jupyter/optimisation/visualisation/2020/07/20/interactive-genetic-algorithm-dashboard-from-scratch-in-python.html) except this time there will be a lot less death. You won’t necessarily need to have read that blog post but I will be referring back to it once or twice so you may want to go back and read that first. # # ## Make it interactive because # Let's build a dashboard in which you can **control parameters of Particle Swarm Optimisation, click a target and see the little dots flock towards it**. Like an interactive, 2D version of this plot on Wikipedia. # ![pso_gif.gif](https://www.scottcondron.com/images/copied_from_nb/my_icons/pso_gif.gif "Source: Wikipedia") # # Swarm Intelligence # ## Wait, why no death? # # Genetic algorithm is based on genetic evolution where for each generation there is survival-of-the-fittest-style well... death. In the case of Particle Swarm Optimisation, **there is the same population throughout** because we want them to **remember where they were when they were at their fittest**. Like looking back at yourself on your wedding day or after a health kick. **Each particles position is a potential solution to your problem so they're all trying to find the _best position_ together.** :heart_eyes: # # ## Adding velocity to the mix # # In the case of Genetic Algorithm each member of the population was just a few numbers (their X and Y position), the parameters that you’re trying to optimise. In this case each particle will not just have a X and Y position, **they also have a velocity**. We also need a way to know how to improve the particles in our swarm... # ### Closer (smaller distance) is better # The same as with Genetic Algorithm, we'll need to **find the fittest member of the population using euclidean distance** / mean squared error (which particle is closest to the target). # In[1]: #collapse-hide def mean_squared_error(y_true, y_pred): return ((y_true - y_pred)**2).mean(axis=0) target_x, target_y = 0,0 def problem(soln): global target_x #using globals so we can link this to the click event later global target_y return mean_squared_error(soln, [target_x, target_y]) def assess_fitness(individual, problem): "Determines the fitness of an individual using the given problem" return problem(individual) # ## Nostalgic by design # **Each member is going to keep track of their fittest position**, this can help them if they explore a worse direction, or want to tell other particles (but we'll get to that later). They also keep an ID so that we can colour them across iterations. # # ## A portrait of a particle # # The big red blob is one particle which has an X and Y position, a velocity and is constantly reminiscing about it's fittest position. # ![pso.png](https://www.scottcondron.com/images/copied_from_nb/my_icons/particle.png "A portrait of a particle") # Here's that in code (before we add any of the update logic). # In[2]: #collapse-hide import numpy as np import pandas as pd import random from holoviews import opts, dim import holoviews as hv import panel as pn from holoviews.streams import Stream hv.extension('bokeh', logo=False) # In[3]: #collapse-show class Particle: def __init__(self, problem, velocity, position, index): self.problem = problem self.velocity = velocity self.position = position self.fittest_position = position self.id = index # ## Create a "swarm" of them # For each particle, we want their position and velocity. # In[4]: #collapse-show swarm_size = 50 vector_length = 2 # x & y swarm = [Particle(problem, np.random.uniform(-2, 2, vector_length), np.random.rand(vector_length), i) for i, x in enumerate(range(swarm_size))] # We also convert their velocity into angle and magnitude for the little arrows in the visualisation. # Here's what our swarm looks like: # In[5]: #collapse-hide def to_angle(vector): x = vector[0] y = vector[1] mag = np.sqrt(x**2 + y**2) angle = (np.pi/2.) - np.arctan2(x/mag, y/mag) return mag, angle def get_vectorfield_data(swarm): '''Returns (xs, ys, angles, mags, ids)''' xs, ys, angles, mags, ids = [], [], [], [], [] for particle in swarm: xs.append(particle.position[0]) ys.append(particle.position[1]) mag, angle = to_angle(particle.velocity) mags.append(mag) angles.append(angle) ids.append(particle.id) return xs, ys, angles, mags, ids vect_data = get_vectorfield_data(swarm) vectorfield = hv.VectorField(vect_data, vdims=['Angle', 'Magnitude', 'Index']) # [x, y, id] for all particles particles = [np.array([vect_data[0], vect_data[1], vect_data[4]]) for i, particle in enumerate(swarm)] points = hv.Points(particles, vdims=['Index']) layout = vectorfield * points layout.opts( opts.VectorField(color='Index', cmap='tab20c', magnitude=dim('Magnitude').norm()*10, pivot='tail'), opts.Points(color='Index', cmap='tab20c', size=5) ) pn.Column(layout.opts(width=500, height=500)) # ![static_pso.png](https://www.scottcondron.com/images/copied_from_nb/my_icons/static_pso.png "Particles with an arrow showing their velocity.") # > Note: We initialised the particles with a velocity for visualising but we'll initialise them with zero velocity when it comes to actually optimising. # # Updating # # Okay so we have a population of particles, **each with a position, velocity and fittest position** but how can we update this population to find our _optimum_ spot. # # Each particle could just move in the direction that they think the optimum spot is. But if they overshoot it or get lost, thankfully they remember their best position so they can use that a little bit too. # # ## Particles' social lives # # Seems pretty inefficient for a bunch of these particles to all be trying the same thing without sharing any information with each other. In PSO, **they can get "fittest position" from some other members of the population when they’re updating (called the social component)**. # # They choose a few other particles and say “hey I’m looking for this red marker, any chance you’ve seen it? “ and the other particles reply “No but here is where I was when I was closest to it.“. Thrilling conversations. # > Note: Intesting side note, PSO was introduced by James Kennedy and Russell Eberhart in 1995 after they discovered its optimisation properties while trying to build a social simulator. # ## Too much social interaction # # A quick way to get stuck with a bad solution to a complex problem is to only listen to one suggestion and following that. This is what happens in particle swarm optimisation when all particles communicate to all of the particles during their update step (called the **global component**). # ### Update code # Here's the code for the Particle to update itself at each iteration. # In[6]: #collapse-show def update(self, fittest_informant, global_fittest, follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step): """ Updates the velocity and position of the particle using the PSO update algorithm""" self.position += self.velocity * scale_update_step cognitive = random.uniform(0, follow_personal_best) social = random.uniform(0, follow_social_best) glob = random.uniform(0, follow_global_best) self.velocity = (follow_current * self.velocity + cognitive * (self.fittest_position - self.position) + social * (fittest_informant.fittest_position - self.position) + glob * (global_fittest.fittest_position - self.position)) current_fitness = self.assess_fitness() if (current_fitness < self.previous_fitness and self.previous_fitness is not None): self.fittest_position = self.position self.previous_fitness = current_fitness # > Note: We are using a variant of the PSO algorithm introduced in 1995, with a social component as well as global. Also, we sample uniformly from 0 and our given update parameter before updating each part of the equation. # There are various values used to determine how to update the current velocity (as described above). # - `follow_current` is how much to use the particles current velocity. # - `cognitive` is how much to use the particles personal best fittest position. # - `social` is how much to use it's the fittest position of a smaller subset of the population. # - `glob` (global) is how much to use the fittest position of the fittest particle in the population. # # These are applied to the difference between the particles current position and a "fit" other position (either it's own fittest position or another particle's fittest position). # ### Particle Class # Here is the Particle class with the `update` and `assess_fitness` methods added in. # In[7]: #collapse-hide class Particle: """ An Particle used in PSO. Attributes ---------- problem : function to minimise velocity : nparray The current velocity of the particle position : nparray The current position of the particle, used as the solution for the problem given id : int The unique id of the particle Public Methods ------- assess_fitness() Determines the fitness of the particle using the given problem update(fittest_informant, global_fittest, follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step) Updates the velocity and position of the particle using the PSO update algorithm """ def __init__(self, problem, velocity, position, index): self.velocity = velocity self.position = position self.fittest_position = position self.problem = problem self.id = index self.previous_fitness = 1e7 def assess_fitness(self): """Determines the fitness of the particle using the given problem""" return assess_fitness(self.position, self.problem) def update(self, fittest_informant, global_fittest, follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step): """ Updates the velocity and position of the particle using the PSO update algorithm""" self.position += self.velocity * scale_update_step cognitive = random.uniform(0, follow_personal_best) social = random.uniform(0, follow_social_best) glob = random.uniform(0, follow_global_best) self.velocity = (follow_current * self.velocity + cognitive * (self.fittest_position - self.position) + social * (fittest_informant.fittest_position - self.position) + glob * (global_fittest.fittest_position - self.position)) current_fitness = self.assess_fitness() if (current_fitness < self.previous_fitness): self.fittest_position = self.position self.previous_fitness = current_fitness # ## Find the fittest Particle in the swarm # We use this `find_current_best` method to keep track of our current fittest `Particle`, and to find the best among a selected few "informant" `Particle`s for the social component. # In[8]: #collapse-show def find_current_best(swarm, problem): """Evaluates a given swarm and returns the fittest particle based on their best previous position This can be sped up to only loop over swarm once, but because this is a tutorial, 3 lines is nicer. """ fitnesses = [assess_fitness(x.fittest_position, problem) for x in swarm] best_value = min(fitnesses) best_index = fitnesses.index(best_value) return swarm[best_index] # ## PSO Class # This is just a wrapper which updates all the particles and keeps track of the current fittest. # > Note: One thing to note is that we randomly sample the swarm to get the "informants" for the `social` update in each particle. There are many different topologies that can be chosen for this part of the algorithm, but we're keeping it simple here. # In[9]: #collapse-hide class PSO: """ An implementation of Particle Swarm Optimisation, pioneered by Kennedy, Eberhart and Shi. The swarm consists of Particles with 2 fixed length vectors; velocity and position. Position is initialised with a uniform distribution between 0 and 1. Velocity is initialised with zeros. Each particle has a given number of informants which are randomly chosen at each iteration. Attributes ---------- swarm_size : int The size of the swarm vector_length : int The dimensions of the problem, should be the same used when creating the problem object num_informants: int The number of informants used for social component in particle velocity update Public Methods ------- improve(follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step) Update each particle in the swarm and updates the global fitness update_swarm(follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step) Updates each particle, randomly choosing informants for each particle's update. update_global_fittest() Updates the `globale_fittest` variable to be the current fittest Particle in the swarm. """ def __init__(self, problem, swarm_size, vector_length, num_informants=2): self.swarm_size = swarm_size self.num_informants = num_informants self.problem = problem self.swarm = [Particle(self.problem, np.zeros(vector_length), np.random.rand(vector_length), i) for i, x in enumerate(range(swarm_size))] self.global_fittest = np.random.choice(self.swarm, 1)[0] def update_swarm(self, follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step): """Update each particle in the swarm""" for particle in self.swarm: informants = np.random.choice(self.swarm, self.num_informants) if particle not in informants: np.append(informants, particle) fittest_informant = find_current_best(informants, self.problem) particle.update(fittest_informant, self.global_fittest, follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step) def update_global_fittest(self): fittest = find_current_best(self.swarm, self.problem) global_fittest_fitness = self.global_fittest.assess_fitness() if (fittest.assess_fitness() < global_fittest_fitness): self.global_fittest = fittest def improve(self, follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step): """Improves the population for one iteration.""" self.update_swarm(follow_current, follow_personal_best, follow_social_best, follow_global_best, scale_update_step) self.update_global_fittest() size = 25 vector_length = 2 num_informants = 2 pso = PSO(problem, size, vector_length) # # Interaction # We're using [Panel](https://panel.holoviz.org/) (a library from Anaconda) for the sliders and buttons. Because there are a _lot_ of settings for PSO, we'll leave a escape hatch for people in the form of a `reset_button` which will set the sliders to their default. # ## Sliders and defaults # In[10]: #collapse-hide default_pop_size = 25 default_time = 3 default_num_informants = 6 population_size_slider = pn.widgets.IntSlider(name='Population Size', start=10, end=50, value=default_pop_size) time_slider = pn.widgets.IntSlider(name='Time Evolving (s)', start=0, end=15, value=default_time) num_informants_slider = pn.widgets.IntSlider(name='Number of Informants', start=0, end=20, value=default_num_informants) default_current = 0.7 default_personal_best = 2.0 default_social_best = 0.9 default_global_best = 0.0 default_scale_update_step = 0.7 follow_current_slider = pn.widgets.FloatSlider(name='Follow Current', start=0.0, end=5, value=default_current) follow_personal_best_slider = pn.widgets.FloatSlider(name='Follow Personal Best', start=0, end=5, value=default_personal_best) follow_social_best_slider = pn.widgets.FloatSlider(name='Follow Social Best', start=0.0, end=5, value=default_social_best) follow_global_best_slider = pn.widgets.FloatSlider(name='Follow Global Best', start=0.0, end=1, value=default_global_best) scale_update_step_slider = pn.widgets.FloatSlider(name='Scale Update Step', start=0.0, end=1, value=0.7) reset_params_button = pn.widgets.Button(name='Reset Parameters', width=50) def reset_event(event): global default_current global default_personal_best global default_social_best global default_global_best global default_scale_update_step global default_pop_size global default_time global default_num_informants follow_current_slider.value, follow_personal_best_slider.value = default_current, default_personal_best follow_social_best_slider.value, follow_global_best_slider.value = default_social_best, default_global_best scale_update_step_slider.value, population_size_slider.value = default_scale_update_step, default_pop_size time_slider.value, num_informants_slider.value = default_time, default_num_informants reset_params_button.on_click(reset_event) # ## Set the Target # # For the "click to set target" interaction, we'll use a `Holoviews` [DynamicMap](https://holoviews.org/reference/containers/bokeh/DynamicMap.html). It sounds complicated but put simply, it links a stream with a callback function. In this case the stream we're using is a `hv.stream.SingleTap`, which will trigger the `tap_event` callback function with the x and y position of the tap when a tap happens. A `hv.Points` object is returned which can be displayed later. # In[11]: #collapse-show def tap_event(x,y): global target_x global target_y if x is not None: target_x, target_y = x,y return hv.Points((x,y,1), label='Target').opts(color='r', marker='^', size=15) target_x, target_y = 0.5, 0.5 tap_stream = hv.streams.SingleTap(transient=True, x=target_x, y=target_y) target_tap = hv.DynamicMap(tap_event, streams=[tap_stream]) # ## Create button events # Now for the best part, animating the Particles. This time our callback will return our swarm visualised using `hv.Points` for the particle points, `hv.VectorField` for the velocity arrows, and `hv.Points` to circle the fittest particle. # # We're going to use a `Holoviews` [DynamicMap](https://holoviews.org/reference/containers/bokeh/DynamicMap.html) again. This time, our stream that we link to the callback is one with no parameters so we can trigger it with our buttons. `run_button` creates a new population and uses DynamicMap's `periodic` method to keep updating it for a given period of time (set with a slider from above). If there's anything there you'd like explained more, [feel free to reach out to me on Twitter](https://twitter.com/_ScottCondron). # In[12]: #collapse-show def update(): pso.improve(follow_current_slider.value, follow_personal_best_slider.value, follow_social_best_slider.value, follow_global_best_slider.value, scale_update_step_slider.value) vect_data = get_vectorfield_data(pso.swarm) vectorfield = hv.VectorField(vect_data, vdims=['Angle', 'Magnitude', 'Index']) particles = [np.array([vect_data[0], vect_data[1], vect_data[4]]) for i, particle in enumerate(swarm)] scatter = hv.Points(particles, vdims=['Index'], group='Particles') fittest = hv.Points((pso.global_fittest.fittest_position[0], pso.global_fittest.fittest_position[1],1), label='Current Fittest') layout = vectorfield * scatter * fittest layout.opts( opts.Points(color='b', fill_alpha=0.1, line_width=1, size=10), opts.VectorField(color='Index', cmap='tab20c', magnitude=dim('Magnitude').norm()*10, pivot='tail'), opts.Points('Particles', color='Index', cmap='tab20c', size=5, xlim=(0,1), ylim=(0,1)) ) return layout particles = hv.DynamicMap(update, streams=[Stream.define('Next')()]) run_button = pn.widgets.Button(name='\u25b6 Begin Improving', width=50) def b(event): global pso size = population_size_slider.value vector_length = 2 num_informants = num_informants_slider.value pso_fitnesses = [] pso = PSO(problem, size, vector_length, num_informants) particles.periodic(0.005, timeout=time_slider.value) run_button.on_click(b) # ## New Population Button # We'll also add a button which can step through the update process or reset the population. We do this by hooking up other buttons to the `particles.streams` DynamicMap and passing it to `hv.streams.Stream.trigger`. # In[13]: #collapse-show def new_pop_event(event): global pso size = population_size_slider.value num_informants = num_informants_slider.value pso = PSO(problem, size, vector_length=2, num_informants=num_informants) hv.streams.Stream.trigger(particles.streams) new_pop_button = pn.widgets.Button(name='New Population', width=50) new_pop_button.on_click(new_pop_event) def next_gen_event(event): hv.streams.Stream.trigger(particles.streams) next_generation_button = pn.widgets.Button(name='Next Generation', width=50) next_generation_button.on_click(next_gen_event) # ## Layout everything together # In[15]: #collapse-show instructions = pn.pane.Markdown(''' # Particle Swarm Optimisation Dashboard ## Instructions: 1. **Click on the plot to place the target.** 2. Click '\u25b6 Begin Improving' button to begin improving for the time on the Time Evolving slider. 3. Experiment with the sliders ''') dashboard = pn.Column(instructions, pn.Row((particles*target_tap).opts(width=600, height=600), pn.Column( pn.Row(run_button, pn.Spacer(width=50), new_pop_button), next_generation_button, time_slider, num_informants_slider, population_size_slider, follow_current_slider, follow_personal_best_slider, follow_social_best_slider, follow_global_best_slider, scale_update_step_slider, reset_params_button))) dashboard # Here's a gif of the final result! Click to set a target, set the parameters with the sliders and click the 'Begin Improving' button to **see the particles swarm!** # ![pso.gif](https://www.scottcondron.com/images/copied_from_nb/my_icons/pso.gif "Wahoo!! The final result!") # # Conclusion # Particle Swarm Optimisation is a really intesting algorithm which was built while trying to build a simiplified model of social interactions. The original aim was to create an algorithm in which the particles would behave like flocking birds. Here's a link if you'd like to [read the original paper](http://ai.unibo.it/sites/ai.unibo.it/files/u11/pso.pdf). # # We've built PSO from the ground up and have seen how **Swarm Intelligence** emerges! # # We've also looked at [Anaconda](https://www.anaconda.com/)'s [HoloViz](https://holoviz.org/) tools (HoloViews, Panel and Bokeh). Using these **we built an interactive dashboard which shows all the particles updating!** # # I personally love learning about these kind of algorithms and finding ways to interact with them visually. I'd love to hear from you. What do you think about these nature-inspired algorithms? Did you learn a bit about creating interactive visualisations in Python by reading this article? # # If so, feel free to [share it](https://ctt.ac/1Uv0f), and you’re also more than welcome to contact me (via [Twitter](https://twitter.com/_ScottCondron)) if you have any questions, comments, or feedback. # # And finally, **here's the dashboard deployed on PyViz examples: https://particle-swarms.pyviz.demo.anaconda.com** # # **Thanks a lot to the team at Anaconda** for their great tools and for deploying this :heart:! # # **Thanks for reading!** :rocket: # # Follow @_ScottCondron #