%%writefile data/solar_events.csv
timestamp,event
2021-03-20 09:37:00,March Equinox
2021-06-21 03:32:00,June Solstice
2021-09-22 19:21:00,September Equinox
2021-12-21 15:59:00,December Solstice
Writing data/solar_events.csv
Pandas has the pandas.read_csv function to easily read CSV files into DataFrames. Beam has the beam.dataframe.io.read_csv function that emulates pandas.read_csv, but returns a deferred Beam DataFrame.
If you’re using Interactive Beam, you can use collect to bring a Beam DataFrame into local memory as a Pandas DataFrame.
import apache_beam as beam
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
pipeline = beam.Pipeline(InteractiveRunner())
# Create a deferred Beam DataFrame with the contents of our csv file.
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('data/solar_events.csv')
# We can use `ib.collect` to view the contents of a Beam DataFrame.
ib.collect(beam_df)
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
timestamp | event | |
---|---|---|
0 | 2021-03-20 09:37:00 | March Equinox |
1 | 2021-06-21 03:32:00 | June Solstice |
2 | 2021-09-22 19:21:00 | September Equinox |
3 | 2021-12-21 15:59:00 | December Solstice |
If you have your data as a Beam DataFrame, you can convert it into a regular PCollection with to_pcollection.
Converting a Beam DataFrame in this way yields a PCollection with a schema. This allows us to easily access each property by attribute, for example element.event and element.timestamp.
Sometimes it's more convenient to convert the named tuples to Python dictionaries. We can do that with the _asdict method.
import apache_beam as beam
from apache_beam.dataframe import convert
with beam.Pipeline() as pipeline:
beam_df = pipeline | 'Read CSV' >> beam.dataframe.io.read_csv('data/solar_events.csv')
(
# Convert the Beam DataFrame to a PCollection.
convert.to_pcollection(beam_df)
# We get named tuples, we can convert them to dictionaries like this.
| 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))
# Print the elements in the PCollection.
| 'Print' >> beam.Map(print)
)
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'} {'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'} {'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'} {'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}
If you have your data as a Pandas DataFrame, you can convert it into a regular PCollection with to_pcollection.
Since Pandas DataFrames are not part of any Beam pipeline, we must provide the pipeline explicitly.
import pandas as pd
import apache_beam as beam
from apache_beam.dataframe import convert
with beam.Pipeline() as pipeline:
df = pd.read_csv('data/solar_events.csv')
(
# Convert the Pandas DataFrame to a PCollection.
convert.to_pcollection(df, pipeline=pipeline)
# We get named tuples, we can convert them to dictionaries like this.
| 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))
# Print the elements in the PCollection.
| 'Print' >> beam.Map(print)
)
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'} {'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'} {'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'} {'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}
If you have your data as a PCollection of Pandas DataFrames, you can convert them into a PCollection with FlatMap.
ℹ️ If the number of elements in each DataFrame can be very different (that is, some DataFrames might contain thousands of elements while others contain only a handful of elements), it might be a good idea to Reshuffle. This basically rebalances the elements in the PCollection, which helps make sure all the workers have a balanced number of elements.
import pandas as pd
import apache_beam as beam
with beam.Pipeline() as pipeline:
(
pipeline
| 'Filename' >> beam.Create(['data/solar_events.csv'])
# Each element is a Pandas DataFrame, so we can do any Pandas operation.
| 'Read CSV' >> beam.Map(pd.read_csv)
# We yield each element of all the DataFrames into a PCollection of dictionaries.
| 'To dictionaries' >> beam.FlatMap(lambda df: df.to_dict('records'))
# Reshuffle to make sure parallelization is balanced.
| 'Reshuffle' >> beam.Reshuffle()
# Print the elements in the PCollection.
| 'Print' >> beam.Map(print)
)
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'} {'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'} {'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'} {'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}
If you have your data as a PCollection, you can convert it into a deferred Beam DataFrame with to_dataframe.
ℹ️ To convert a PCollection to a Beam DataFrame, each element must have a schema.
import csv
import apache_beam as beam
from apache_beam.dataframe import convert
with open('data/solar_events.csv') as f:
solar_events = [dict(row) for row in csv.DictReader(f)]
with beam.Pipeline() as pipeline:
pcoll = pipeline | 'Create data' >> beam.Create(solar_events)
# Convert the PCollection into a Beam DataFrame
beam_df = convert.to_dataframe(pcoll | 'To Rows' >> beam.Map(
lambda x: beam.Row(
timestamp=x['timestamp'],
event=x['event'],
)
))
# Print the elements in the Beam DataFrame.
(
convert.to_pcollection(beam_df)
| 'To dictionaries' >> beam.Map(lambda x: dict(x._asdict()))
| 'Print' >> beam.Map(print)
)
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
{'timestamp': '2021-03-20 09:37:00', 'event': 'March Equinox'} {'timestamp': '2021-06-21 03:32:00', 'event': 'June Solstice'} {'timestamp': '2021-09-22 19:21:00', 'event': 'September Equinox'} {'timestamp': '2021-12-21 15:59:00', 'event': 'December Solstice'}
PCollections to Pandas DataFrames If you have your data as a PCollection, you can convert it into an in-memory Pandas DataFrame via a side input.
ℹ️ It's recommended to only do this if you need to use a Pandas operation that is not supported in Beam DataFrames. Converting a PCollection into a Pandas DataFrame consolidates elements from potentially multiple workers into a single worker, which could create a performance bottleneck.
⚠️ Pandas DataFrames are in-memory data structures, so make sure all the elements in the PCollection fit into memory. If they don't fit into memory, consider yielding multiple DataFrame elements via FlatMap.
import csv
import pandas as pd
import apache_beam as beam
with open('data/solar_events.csv') as f:
solar_events = [dict(row) for row in csv.DictReader(f)]
with beam.Pipeline() as pipeline:
pcoll = pipeline | 'Create data' >> beam.Create(solar_events)
(
pipeline
# Create a single element containing the entire PCollection.
| 'Singleton' >> beam.Create([None])
| 'As Pandas' >> beam.Map(
lambda _, dict_iter: pd.DataFrame(dict_iter),
dict_iter=beam.pvalue.AsIter(pcoll),
)
# Print the Pandas DataFrame.
| 'Print' >> beam.Map(print)
)
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
timestamp event 0 2021-03-20 09:37:00 March Equinox 1 2021-06-21 03:32:00 June Solstice 2 2021-09-22 19:21:00 September Equinox 3 2021-12-21 15:59:00 December Solstice