import numpy as np
import pandas as pd
import holoviews as hv
from holoviews import opts, dim
import geoviews as gv
hv.extension('bokeh')
opts.defaults(opts.RGB(width=600, height=600))
diamonds = pd.read_csv('../../data/diamonds.csv')
As should be second nature for us now, we will look at this dataframe before we start doing anything.
diamonds.head()
Next we will display a static plot of 'carat' vs. 'price' as we did in the first exercise, alongside a BoxWhisker plot of the distributions.
scatter_opts = opts.Scatter(width=600, height=400, logy=True, tools=['box_select'],
color=dim('cut'), size=1.5, cmap='tab20c')
scatter = hv.Scatter(diamonds.sample(10000), 'carat', ['price', 'cut', 'clarity']).select(carat=(0, 3))
boxwhisker = hv.BoxWhisker(scatter, 'clarity', 'price')
scatter.opts(scatter_opts) + boxwhisker
By default, the BoxWhisker
element here will statically display the whole distribution. But if you try out the "Box select" tool, you can select a subset of the Scatter points. Can we link the boxwhisker plot to selections made on the Scatter
plot, so that we can see distributions in that particular region of the data space? Yes, as long as we have these three things:
scatter
objectFor step 1, we provide the scatter
object as the source for a Selection1D
stream that will provide the index
of all the selected nodes:
selection = hv.streams.Selection1D(source=scatter)
For step 2, write a function that can accept the index
values, select those values from the original dataset, and return the appropriate HoloViews element; something like:
def selection_boxwhisker(index):
selection = scatter.iloc[index] if len(index)>0 else scatter
return ...some hv element built from the selection...
Here selection_boxwhisker
should return a BoxWhisker
element for the selection, plotting 'price' against 'clarity'.
For step 3, define a DynamicMap
using the selection
stream and your custom callback and lay it out next to the scatter
object as above.
DynamicMap
requires a callback function as its first argument and streams should be supplied in a list as a keyword argument.
selection = hv.streams.Selection1D(source=scatter)
def selection_boxwhisker(index):
selection = scatter.iloc[index]
return hv.BoxWhisker(selection, 'clarity', 'price')
scatter + hv.DynamicMap(selection_boxwhisker, streams=[selection])
Exercise 1 used HoloViews streams to collect user interaction events (selections). Here, let's use them to view data sources that themselves are updating over time.
First, let's set up a (simulated) streaming data source in form of taxi pickup locations. The code below splits the taxi dataset into chunks by hour which will be emitted one by one to emulate a live, streaming data source.
import time
import colorcet
from itertools import cycle
from holoviews.operation.datashader import datashade
def taxi_trips_stream(source='../../data/nyc_taxi_wide.parq', frequency='H'):
"""Generate dataframes grouped by given frequency"""
def get_group(resampler, key):
try:
df = resampler.get_group(key)
df.reset_index(drop=True)
except KeyError:
df = pd.DataFrame()
return df
df = pd.read_parquet(source,
columns=['tpep_pickup_datetime', 'pickup_x', 'pickup_y', 'fare_amount'])
df = df.set_index('tpep_pickup_datetime', drop=True)
df = df.sort_index()
r = df.resample(frequency)
chunks = [get_group(r, g) for g in sorted(r.groups)]
indices = cycle(range(len(chunks)))
while True:
yield chunks[next(indices)]
trips = taxi_trips_stream()
example = next(trips)
As usual let's start by inspecting the data, in this case the initial chunk emitted above:
example.head()
To build our streaming visualization, first declare a a map tile source for a background plot, and then make a Pipe
stream initialized with the example chunk of data already emitted:
tiles = gv.WMTS('https://maps.wikimedia.org/osm-intl/{Z}/{X}/{Y}@2x.png')
pipe = hv.streams.Pipe(example)
Then you will need to define a callback to use when declaring a DynamicMap
. This function will need to accept a chunk of data, then return a Points
object displaying the 'pickup_x' and 'pickup_y' coordinates and a label
indicating the time range being covered. Something like:
def hourly_points(data):
label = '%s - %s' % (str(data.index.min()), str(data.index.max()))
return ...some hv object using the given data...
Finally, use that callback and the pipe
stream to define a DynamicMap
, applying the datashade operation to the DynamicMap and then overlaying it on top of the tiles
.
Warning: Do not display the DynamicMap
without applying the datashade()
operation, or you run the risk of freezing your browser.
datashade(dynamicmap)
.
You should now see a map of New York City with the taxi trips on top. Run the next cell to send events to the Pipe
and update the plot.
for i in range(100):
time.sleep(0.05)
pipe.send(next(trips))
pipe = hv.streams.Pipe(example)
tiles = gv.WMTS('https://maps.wikimedia.org/osm-intl/{Z}/{X}/{Y}@2x.png')
def hourly_points(data):
label = '%s - %s' % (str(data.index.min()), str(data.index.max()))
return hv.Points(data, ['pickup_x', 'pickup_y'], label=label)
points = hv.DynamicMap(hourly_points, streams=[pipe])
tiles * datashade(points).opts(opts.RGB(width=600, height=600))
In the previous exercise we used the Pipe
stream, which emits just the latest chunk. That's a good way to monitor an ongoing stream, but often you'll instead want to accumulate data over time, showing the latest chunk combined with other previous chunks. Here we will stream data using the Buffer
stream, which accumulates data until its length is reached. We will start by defining some options, an example dataframe, and the Buffer
stream with a length of 1,000,000:
opts.defaults(
opts.Curve(width=800, height=400, color='black', line_width=1, framewise=True),
opts.Scatter(color='red'))
from holoviews.operation.timeseries import resample, rolling_outlier_std
example = next(trips)[['fare_amount']]
buffer = hv.streams.Buffer(example, length=1000000)
As before, you'll need to complete the callback function so it returns an element. In this case, we need a Curve
plotting the 'fare_amount' against the 'tpep_pickup_datetime', starting something like:
def fare_curve(data):
...
Again as before, we need to define a DynamicMap
that uses this callback in combination with a stream (buffer
in this case). Here let's assign it to a variable rather than try to show it right away:
Next, apply the resample
operation to the DynamicMap object, withrule='T'
and function=np.sum
and then apply the rolling_outlier_std
operation to the output of that. Finally display an overlay of theresample
output and the rolling_outlier_std
output.
resample
and rolling_outlier_std
can be chained, e.g.:
resampled = resample(dmap)
outliers = rolling_outlier_std(resampled)
resampled * outliers
Now that you've displayed the plot, let's start sending some data to the buffer, which should start accumulating 1000000 trips:
for i in range(100):
time.sleep(0.1)
buffer.send(next(trips)[['fare_amount']])
example = next(trips)[['fare_amount']]
buffer = hv.streams.Buffer(example, length=1000000)
def fare_curve(data):
return hv.Curve(data, 'tpep_pickup_datetime', 'fare_amount')
fares = hv.DynamicMap(fare_curve, streams=[buffer])
minutely = resample(fares, rule='T', function=np.sum)
minutely * rolling_outlier_std(minutely, rolling_window=10)