%%writefile src/pipeline2.py
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
Overwriting src/pipeline2.py
In this example, the code first creates a PipelineOptions
object. This object lets us set various options for our pipeline, such as the pipeline runner that will execute our pipeline and any runner-specific configuration required by the chosen runner. In this example we set these options programmatically, but more often, command-line arguments are used to set PipelineOptions
.
You can specify a runner for executing your pipeline, such as the DataflowRunner
or SparkRunner
. If you omit specifying a runner, as in this example, your pipeline executes locally using the DirectRunner
. In the next sections, we will specify the pipeline's runner.
%%writefile -a src/pipeline2.py
def main(argv=None, save_main_session=True):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
Appending to src/pipeline2.py
The next step is to create a Pipeline object with the options we’ve just constructed. The Pipeline object builds up the graph of transformations to be executed, associated with that particular pipeline.
%%writefile -a src/pipeline2.py
with beam.Pipeline(options=pipeline_options) as p:
Appending to src/pipeline2.py
The MinimalWordCount pipeline contains several transforms to read data into the pipeline, manipulate or otherwise transform the data, and write out the results. Transforms can consist of an individual operation, or can contain multiple nested transforms (which is a composite transform).
Each transform takes some kind of input data and produces some output data. The input and output data is often represented by the SDK class PCollection
. PCollection
is a special class, provided by the Beam SDK, that you can use to represent a dataset of virtually any size, including unbounded datasets.
The MinimalWordCount pipeline contains five transforms:
Read
transform is applied to the Pipeline
object itself, and produces a PCollection
as output. Each element in the output PCollection
represents one line of text from the input file.%%writefile -a src/pipeline2.py
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)
Appending to src/pipeline2.py
PCollection<String>
, where each element is an individual word in Shakespeare's collected texts. As an alternative, it would have been possible to use a ParDo transform that invokes a DoFn
(defined in-line as an anonymous class) on each element that tokenizes the text lines into individual words. The input for this transform is the PCollection
of text lines generated by the previous TextIO.Read
transform. The ParDo
transform outputs a new PCollection
, where each element represents an individual word in the text.Count
transform is a generic transform that takes a PCollection
of any type, and returns a PCollection
of key/value pairs. Each key represents a unique element from the input collection, and each value represents the number of times that key appeared in the input collection. In this pipeline, the input for Count
is the PCollection
of individual words generated by the previous ParDo
, and the output is a PCollection
of key/value pairs where each key represents a unique word in the text and the associated value is the occurrence count for each.%%writefile -a src/pipeline2.py
# Count the occurrences of each word.
counts = (
lines
| 'Split' >> (
beam.FlatMap(
lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum))
Appending to src/pipeline2.py
ParDo
. For each element in the input PCollection
, the map transform applies a function that produces exactly one output element.%%writefile -a src/pipeline2.py
# Format the counts into a PCollection of strings.
def format_result(word_count):
(word, count) = word_count
return '%s: %s' % (word, count)
output = counts | 'Format' >> beam.Map(format_result)
Appending to src/pipeline2.py
PCollection
of formatted Strings as input and writes each element to an output text file. Each element in the input PCollection
represents one line of text in the resulting output file.%%writefile -a src/pipeline2.py
# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)
Appending to src/pipeline2.py
Note that the Write
transform produces a trivial result value of type PDone
, which in this case is ignored.
%%writefile -a src/pipeline2.py
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
main()
Appending to src/pipeline2.py
!python src/pipeline2.py --input data/kinglear.txt --output output/wordcount
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner. WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter. INFO:root:Default Python SDK image for environment is apache/beam_python3.9_sdk:2.38.0 INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function annotate_downstream_side_inputs at 0x7ff4e16233a0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function fix_side_input_pcoll_coders at 0x7ff4e16234c0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function pack_combiners at 0x7ff4e16239d0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7ff4e1623a60> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_sdf at 0x7ff4e1623c10> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function expand_gbk at 0x7ff4e1623ca0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sink_flattens at 0x7ff4e1623dc0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function greedily_fuse at 0x7ff4e1623e50> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function read_to_impulse at 0x7ff4e1623ee0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function impulse_to_input at 0x7ff4e1623f70> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7ff4e16241f0> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function setup_timer_mapping at 0x7ff4e1624160> ==================== INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function populate_data_channel_coders at 0x7ff4e1624280> ==================== INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100 INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7ff4e16e1430> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'') INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((ref_AppliedPTransform_ReadFromText-Read-Impulse_4)+(ref_AppliedPTransform_ReadFromText-Read-Map-lambda-at-iobase-py-898-_5))+(ReadFromText/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(ReadFromText/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitAndSizeRestriction))+(ref_PCollection_PCollection_2_split/Write) INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_PCollection_PCollection_2_split/Read)+(ReadFromText/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/Process))+(ref_AppliedPTransform_Split_8))+(ref_AppliedPTransform_PairWithOne_9))+(GroupAndSum/Precombine))+(GroupAndSum/Group/Write) INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_AppliedPTransform_WriteToText-Write-WriteImpl-DoOnce-Impulse_19)+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-DoOnce-FlatMap-lambda-at-core-py-3229-_20))+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-DoOnce-Map-decode-_22))+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-InitializeWrite_23))+(ref_PCollection_PCollection_11/Write))+(ref_PCollection_PCollection_12/Write) INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((((GroupAndSum/Group/Read)+(GroupAndSum/Merge))+(GroupAndSum/ExtractOutputs))+(ref_AppliedPTransform_Format_14))+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-WindowInto-WindowIntoFn-_24))+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-WriteBundles_25))+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-Pair_26))+(WriteToText/Write/WriteImpl/GroupByKey/Write) INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((WriteToText/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-Extract_28))+(ref_PCollection_PCollection_17/Write) INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((ref_PCollection_PCollection_11/Read)+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-PreFinalize_29))+(ref_PCollection_PCollection_18/Write) INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (ref_PCollection_PCollection_11/Read)+(ref_AppliedPTransform_WriteToText-Write-WriteImpl-FinalizeWrite_30) INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1 INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.04 seconds.
!head output/wordcount-00000-of-00001
KING: 243 LEAR: 236 DRAMATIS: 1 PERSONAE: 1 king: 65 of: 447 Britain: 2 OF: 15 FRANCE: 10 DUKE: 3
import apache_beam as beam
import re
inputs_pattern = 'data/kinglear*'
outputs_prefix = 'output/pipe2'
# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
(
pipeline
| 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
| 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
| 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
| 'Group and sum' >> beam.CombinePerKey(sum)
| 'Format results' >> beam.Map(lambda word_count: str(word_count))
| 'Write results' >> beam.io.WriteToText(outputs_prefix)
)
# Sample the first 20 results, remember there are no ordering guarantees.
!head -n 20 {outputs_prefix}-00000-of-*
('KING', 243) ('LEAR', 236) ('DRAMATIS', 1) ('PERSONAE', 1) ('king', 65) ('of', 447) ('Britain', 2) ('OF', 15) ('FRANCE', 10) ('DUKE', 3) ('BURGUNDY', 8) ('CORNWALL', 63) ('ALBANY', 67) ('EARL', 2) ('KENT', 156) ('GLOUCESTER', 141) ('EDGAR', 126) ('son', 29) ('to', 438) ('Gloucester', 26)
Below is mostly the same code as above, but with comments explaining every line in more detail.
import apache_beam as beam
import re
inputs_pattern = 'data/kinglear*'
outputs_prefix = 'output/pipe2'
# Running locally in the DirectRunner.
with beam.Pipeline() as pipeline:
# Store the word counts in a PCollection.
# Each element is a tuple of (word, count) of types (str, int).
word_counts = (
# The input PCollection is an empty pipeline.
pipeline
# Read lines from a text file.
| 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
# Element type: str - text line
# Use a regular expression to iterate over all words in the line.
# FlatMap will yield an element for every element in an iterable.
| 'Find words' >> beam.FlatMap(lambda line: re.findall(r"[a-zA-Z']+", line))
# Element type: str - word
# Create key-value pairs where the value is 1, this way we can group by
# the same word while adding those 1s and get the counts for every word.
| 'Pair words with 1' >> beam.Map(lambda word: (word, 1))
# Element type: (str, int) - key: word, value: 1
# Group by key while combining the value using the sum() function.
| 'Group and sum' >> beam.CombinePerKey(sum)
# Element type: (str, int) - key: word, value: counts
)
# We can process a PCollection through other pipelines too.
(
# The input PCollection is the word_counts created from the previous step.
word_counts
# Format the results into a string so we can write them to a file.
| 'Format results' >> beam.Map(lambda word_count: str(word_count))
# Element type: str - text line
# Finally, write the results to a file.
| 'Write results' >> beam.io.WriteToText(outputs_prefix)
)
# Sample the first 20 results, remember there are no ordering guarantees.
!head -n 20 {outputs_prefix}-00000-of-*
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter. WARNING:apache_beam.io.filebasedsink:Deleting 1 existing files in target path matching: -*-of-%(num_shards)05d
('KING', 243) ('LEAR', 236) ('DRAMATIS', 1) ('PERSONAE', 1) ('king', 65) ('of', 447) ('Britain', 2) ('OF', 15) ('FRANCE', 10) ('DUKE', 3) ('BURGUNDY', 8) ('CORNWALL', 63) ('ALBANY', 67) ('EARL', 2) ('KENT', 156) ('GLOUCESTER', 141) ('EDGAR', 126) ('son', 29) ('to', 438) ('Gloucester', 26)