In this notebook we are going to explore the inner workings of PySpark and also see how PySpark integrates with Google Colaboratory.
We will utilize the PySpark distribution provided by pip
and make use of the integrated Spark engine. The Spark coming with the PySpark distribution has its limitations as it runs as a single Java virtual machine in pseudo-distributed mode and does not allow for customizations but it is practical for learning purposes.
This tutorial aims to offer a rich and engaging learning experience, not just by presenting solutions but also by elucidating the thought processes and methodologies employed in constructing these solutions.
⚠️ Warning: this notebook contains a blocking cell (in "How to input commands in the pyspark
shell") so if you wish to run all cells in the notebook (from the menu Runtime/Run all) this cell needs to be unblocked (by stopping it) or commented out.
“PySpark is the Python API for Apache Spark.”
(https://spark.apache.org/docs/latest/api/python/index.html)
The name PySpark is composed of Python $+$ Spark, this will help you remember how to write "PySpark" with the correct capitalizations.
First of all, we need to install the pyspark
library using the pip
packaging.
Note: this demo depends on the pip
distribution of PySpark and might not work with the conda
distribution.
Also be aware of the warning in the PyPI project page https://pypi.org/project/pyspark/:
“This packaging is currently experimental and may change in future versions (although we will do our best to keep compatibility).”
!pip install pyspark
Requirement already satisfied: pyspark in /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages (3.4.0) Requirement already satisfied: py4j==0.10.9.7 in /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages (from pyspark) (0.10.9.7)
output
library if in Colab¶# true if running on Google Colab
import sys
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
from google.colab import output
PySpark requires Java 8 or later. Java should be in your system PATH
, or the JAVA_HOME
environment variable pointing to a Java installation.
Java 11 is already available on Colab but in other contexts it might need to be installed.
import os
import subprocess
import shutil
# set variable JAVA_HOME (install Java if necessary)
def is_java_installed():
os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
return os.environ['JAVA_HOME']
def install_java():
# Uncomment and modify the desired version
# java_version= 'openjdk-11-jre-headless'
# java_version= 'default-jre'
# java_version= 'openjdk-17-jre-headless'
# java_version= 'openjdk-18-jre-headless'
java_version= 'openjdk-19-jre-headless'
print(f"Java not found. Installing {java_version} ... (this might take a while)")
try:
cmd = f"apt install -y {java_version}"
subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
stdout_result = subprocess_output.stdout
# Process the results as needed
print("Done installing Java {}".format(java_version))
os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
print("JAVA_HOME is {}".format(os.environ['JAVA_HOME']))
except subprocess.CalledProcessError as e:
# Handle the error if the command returns a non-zero exit code
print("Command failed with return code {}".format(e.returncode))
print("stdout: {}".format(e.stdout))
# Install Java if not available
if is_java_installed():
print("Java is already installed: {}".format(os.environ['JAVA_HOME']))
else:
print("Installing Java")
install_java()
Java is already installed: /usr/lib/jvm/temurin-11-jdk-amd64
HelloWorld.py
is a simple app that
spark
This is self-contained application (see https://spark.apache.org/docs/latest/quick-start.html#self-contained-applications).
%%writefile HelloWorld.py
"""HelloWorld.py"""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Hello World").getOrCreate()
print("Hello, World!")
spark.stop()
Writing HelloWorld.py
The way apps are submitted to the Spark engine is the spark-submit
script.
!spark-submit HelloWorld.py
24/10/13 09:43:49 INFO SparkContext: Running Spark version 3.4.0 24/10/13 09:43:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/10/13 09:43:49 INFO ResourceUtils: ============================================================== 24/10/13 09:43:49 INFO ResourceUtils: No custom resources configured for spark.driver. 24/10/13 09:43:49 INFO ResourceUtils: ============================================================== 24/10/13 09:43:49 INFO SparkContext: Submitted application: Hello World 24/10/13 09:43:49 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 24/10/13 09:43:49 INFO ResourceProfile: Limiting resource is cpu 24/10/13 09:43:49 INFO ResourceProfileManager: Added ResourceProfile id: 0 24/10/13 09:43:49 INFO SecurityManager: Changing view acls to: runner 24/10/13 09:43:49 INFO SecurityManager: Changing modify acls to: runner 24/10/13 09:43:49 INFO SecurityManager: Changing view acls groups to: 24/10/13 09:43:49 INFO SecurityManager: Changing modify acls groups to: 24/10/13 09:43:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: runner; groups with view permissions: EMPTY; users with modify permissions: runner; groups with modify permissions: EMPTY 24/10/13 09:43:50 INFO Utils: Successfully started service 'sparkDriver' on port 42101. 24/10/13 09:43:50 INFO SparkEnv: Registering MapOutputTracker 24/10/13 09:43:50 INFO SparkEnv: Registering BlockManagerMaster 24/10/13 09:43:50 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 24/10/13 09:43:50 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 24/10/13 09:43:50 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 24/10/13 09:43:50 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-58b92ad4-108f-4ca6-a993-9f7b655cdc5c 24/10/13 09:43:50 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB 24/10/13 09:43:50 INFO SparkEnv: Registering OutputCommitCoordinator 24/10/13 09:43:50 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI 24/10/13 09:43:50 INFO Utils: Successfully started service 'SparkUI' on port 4040. 24/10/13 09:43:50 INFO Executor: Starting executor ID driver on host fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net 24/10/13 09:43:50 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): '' 24/10/13 09:43:50 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36187. 24/10/13 09:43:50 INFO NettyBlockTransferService: Server created on fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net:36187 24/10/13 09:43:50 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 24/10/13 09:43:50 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net, 36187, None) 24/10/13 09:43:50 INFO BlockManagerMasterEndpoint: Registering block manager fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net:36187 with 434.4 MiB RAM, BlockManagerId(driver, fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net, 36187, None) 24/10/13 09:43:50 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net, 36187, None) 24/10/13 09:43:50 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net, 36187, None) Hello, World! 24/10/13 09:43:50 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/10/13 09:43:50 INFO SparkUI: Stopped Spark web UI at http://fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net:4040 24/10/13 09:43:50 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/13 09:43:50 INFO MemoryStore: MemoryStore cleared 24/10/13 09:43:50 INFO BlockManager: BlockManager stopped 24/10/13 09:43:50 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/13 09:43:50 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/13 09:43:50 INFO SparkContext: Successfully stopped SparkContext 24/10/13 09:43:51 INFO ShutdownHookManager: Shutdown hook called 24/10/13 09:43:51 INFO ShutdownHookManager: Deleting directory /tmp/spark-e6fb6755-f9bd-4891-9898-7ea8dfd671b5 24/10/13 09:43:51 INFO ShutdownHookManager: Deleting directory /tmp/spark-e6fb6755-f9bd-4891-9898-7ea8dfd671b5/pyspark-ae1c4f59-e550-4b28-a58b-bd154e28c307 24/10/13 09:43:51 INFO ShutdownHookManager: Deleting directory /tmp/spark-b2f9a4e7-e54d-4c14-9a0a-c83719242b6e
You should be able to see the "Hello, World!" somewhere in between all the log messages.
The logging messages produced by Spark are generated by the log4j service. We will see later (in "How to set logging level") how to configure the logging level.
For now, let us redirect all log messages to a file log.txt
in order to just the output of the app. Note that logging messages are sent by default to the standard error stream.
!spark-submit HelloWorld.py 2>log.txt
Hello, World!
You can look at the log file log.txt
separately.
!cat log.txt
24/10/13 09:43:53 INFO SparkContext: Running Spark version 3.4.0 24/10/13 09:43:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/10/13 09:43:53 INFO ResourceUtils: ============================================================== 24/10/13 09:43:53 INFO ResourceUtils: No custom resources configured for spark.driver. 24/10/13 09:43:53 INFO ResourceUtils: ============================================================== 24/10/13 09:43:53 INFO SparkContext: Submitted application: Hello World 24/10/13 09:43:53 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 24/10/13 09:43:53 INFO ResourceProfile: Limiting resource is cpu 24/10/13 09:43:53 INFO ResourceProfileManager: Added ResourceProfile id: 0 24/10/13 09:43:53 INFO SecurityManager: Changing view acls to: runner 24/10/13 09:43:53 INFO SecurityManager: Changing modify acls to: runner 24/10/13 09:43:53 INFO SecurityManager: Changing view acls groups to: 24/10/13 09:43:53 INFO SecurityManager: Changing modify acls groups to: 24/10/13 09:43:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: runner; groups with view permissions: EMPTY; users with modify permissions: runner; groups with modify permissions: EMPTY 24/10/13 09:43:54 INFO Utils: Successfully started service 'sparkDriver' on port 44261. 24/10/13 09:43:54 INFO SparkEnv: Registering MapOutputTracker 24/10/13 09:43:54 INFO SparkEnv: Registering BlockManagerMaster 24/10/13 09:43:54 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 24/10/13 09:43:54 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 24/10/13 09:43:54 INFO SparkEnv: Registering BlockManagerMasterHeartbeat 24/10/13 09:43:54 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-8dadfb36-27fd-458b-af5a-8d5e03efffe5 24/10/13 09:43:54 INFO MemoryStore: MemoryStore started with capacity 434.4 MiB 24/10/13 09:43:54 INFO SparkEnv: Registering OutputCommitCoordinator 24/10/13 09:43:54 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI 24/10/13 09:43:54 INFO Utils: Successfully started service 'SparkUI' on port 4040. 24/10/13 09:43:54 INFO Executor: Starting executor ID driver on host fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net 24/10/13 09:43:54 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): '' 24/10/13 09:43:54 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 34707. 24/10/13 09:43:54 INFO NettyBlockTransferService: Server created on fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net:34707 24/10/13 09:43:54 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 24/10/13 09:43:54 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net, 34707, None) 24/10/13 09:43:54 INFO BlockManagerMasterEndpoint: Registering block manager fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net:34707 with 434.4 MiB RAM, BlockManagerId(driver, fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net, 34707, None) 24/10/13 09:43:54 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net, 34707, None) 24/10/13 09:43:54 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net, 34707, None) 24/10/13 09:43:54 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/10/13 09:43:54 INFO SparkUI: Stopped Spark web UI at http://fv-az837-567.ymagj3ilz1se1nskuisabcixse.ex.internal.cloudapp.net:4040 24/10/13 09:43:54 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/13 09:43:54 INFO MemoryStore: MemoryStore cleared 24/10/13 09:43:54 INFO BlockManager: BlockManager stopped 24/10/13 09:43:54 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/13 09:43:54 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/13 09:43:54 INFO SparkContext: Successfully stopped SparkContext 24/10/13 09:43:55 INFO ShutdownHookManager: Shutdown hook called 24/10/13 09:43:55 INFO ShutdownHookManager: Deleting directory /tmp/spark-28058f53-6a6a-4627-8e9c-6dfb0935727a 24/10/13 09:43:55 INFO ShutdownHookManager: Deleting directory /tmp/spark-28058f53-6a6a-4627-8e9c-6dfb0935727a/pyspark-406dbad3-9656-44ec-b9b5-85d149ee4cf7 24/10/13 09:43:55 INFO ShutdownHookManager: Deleting directory /tmp/spark-37ef6775-3c58-436d-9de2-8fdeb334da39
You might have noticed that it took a while for the app to start. The reason for this delay is that in order to run the app a Java Virtual Machine (JVM) had to be started and the Spark engine launched on the JVM.
From the log messages one can ascertain that the app took more or less one minute to run.
%time !spark-submit HelloWorld.py 2>log.txt
Hello, World! CPU times: user 56.4 ms, sys: 1.93 ms, total: 58.3 ms Wall time: 3.86 s
To see how long the app actually took you can look at the History server.
There are several demo apps that come with the PySpark distribution. Let us find out where they are located.
Let us see where PySpark was installed.
!pip show pyspark
Name: pyspark Version: 3.4.0 Summary: Apache Spark Python API Home-page: https://github.com/apache/spark/tree/master/python Author: Spark Developers Author-email: dev@spark.apache.org License: http://www.apache.org/licenses/LICENSE-2.0 Location: /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages Requires: py4j Required-by:
/usr/local/lib/python3.10/dist-packages
is the location where Pyspark is installed. The folder we are looking for is named examples
.
We could have also used find_spark_home.py
to find Pyspark's location.
!find_spark_home.py
/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark
# Run the script and capture its output
result = subprocess.run(["find_spark_home.py"], capture_output=True, text=True)
# Print or use the captured output
print("Output of find_spark_home.py:", result.stdout)
# set SPARK_HOME environment variable
os.environ['SPARK_HOME'] = result.stdout.strip()
Output of find_spark_home.py: /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark
Now the environment variable SPARK_HOME
is set.
os.environ['SPARK_HOME']
'/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark'
List the contents of pyspark
!ls -p $SPARK_HOME
__init__.py install.py sbin/ __pycache__/ instrumentation_utils.py serializers.py _globals.py jars/ shell.py _typing.pyi java_gateway.py shuffle.py accumulators.py join.py sql/ bin/ licenses/ statcounter.py broadcast.py ml/ status.py cloudpickle/ mllib/ storagelevel.py conf.py pandas/ streaming/ context.py profiler.py taskcontext.py daemon.py py.typed traceback_utils.py data/ python/ util.py errors/ rdd.py version.py examples/ rddsampler.py worker.py files.py resource/ find_spark_home.py resultiterable.py
The examples
folder is the one we are interested in.
Install the tree
command for a better visualization of folders.
!apt install tree
E: Could not open lock file /var/lib/dpkg/lock-frontend - open (13: Permission denied) E: Unable to acquire the dpkg frontend lock (/var/lib/dpkg/lock-frontend), are you root?
The PySpark distribution comes with Python examples only. They are under src/main/python
.
For examples in other programming languages (Java, Scala, R) you need to download the Apache Spark distribution (see https://github.com/apache/spark/tree/master/examples/src/main).
examples
directory structure¶!tree -d $SPARK_HOME/examples
/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/examples └── src └── main └── python ├── __pycache__ ├── ml │ └── __pycache__ ├── mllib │ └── __pycache__ ├── sql │ ├── __pycache__ │ └── streaming │ └── __pycache__ └── streaming └── __pycache__ 14 directories
Contents at level $1$
!tree -L 1 $SPARK_HOME/examples/src/main/python
/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/examples/src/main/python ├── __init__.py ├── __pycache__ ├── als.py ├── avro_inputformat.py ├── kmeans.py ├── logistic_regression.py ├── ml ├── mllib ├── pagerank.py ├── parquet_inputformat.py ├── pi.py ├── sort.py ├── sql ├── status_api_demo.py ├── streaming ├── transitive_closure.py └── wordcount.py 5 directories, 12 files
The Pyspark distribution comes with a lot of examples.
!tree -I "__pycache__" $SPARK_HOME/examples
/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/examples └── src └── main └── python ├── __init__.py ├── als.py ├── avro_inputformat.py ├── kmeans.py ├── logistic_regression.py ├── ml │ ├── aft_survival_regression.py │ ├── als_example.py │ ├── binarizer_example.py │ ├── bisecting_k_means_example.py │ ├── bucketed_random_projection_lsh_example.py │ ├── bucketizer_example.py │ ├── chi_square_test_example.py │ ├── chisq_selector_example.py │ ├── correlation_example.py │ ├── count_vectorizer_example.py │ ├── cross_validator.py │ ├── dataframe_example.py │ ├── dct_example.py │ ├── decision_tree_classification_example.py │ ├── decision_tree_regression_example.py │ ├── elementwise_product_example.py │ ├── estimator_transformer_param_example.py │ ├── feature_hasher_example.py │ ├── fm_classifier_example.py │ ├── fm_regressor_example.py │ ├── fpgrowth_example.py │ ├── gaussian_mixture_example.py │ ├── generalized_linear_regression_example.py │ ├── gradient_boosted_tree_classifier_example.py │ ├── gradient_boosted_tree_regressor_example.py │ ├── imputer_example.py │ ├── index_to_string_example.py │ ├── interaction_example.py │ ├── isotonic_regression_example.py │ ├── kmeans_example.py │ ├── lda_example.py │ ├── linear_regression_with_elastic_net.py │ ├── linearsvc.py │ ├── logistic_regression_summary_example.py │ ├── logistic_regression_with_elastic_net.py │ ├── max_abs_scaler_example.py │ ├── min_hash_lsh_example.py │ ├── min_max_scaler_example.py │ ├── multiclass_logistic_regression_with_elastic_net.py │ ├── multilayer_perceptron_classification.py │ ├── n_gram_example.py │ ├── naive_bayes_example.py │ ├── normalizer_example.py │ ├── one_vs_rest_example.py │ ├── onehot_encoder_example.py │ ├── pca_example.py │ ├── pipeline_example.py │ ├── polynomial_expansion_example.py │ ├── power_iteration_clustering_example.py │ ├── prefixspan_example.py │ ├── quantile_discretizer_example.py │ ├── random_forest_classifier_example.py │ ├── random_forest_regressor_example.py │ ├── rformula_example.py │ ├── robust_scaler_example.py │ ├── sql_transformer.py │ ├── standard_scaler_example.py │ ├── stopwords_remover_example.py │ ├── string_indexer_example.py │ ├── summarizer_example.py │ ├── tf_idf_example.py │ ├── tokenizer_example.py │ ├── train_validation_split.py │ ├── univariate_feature_selector_example.py │ ├── variance_threshold_selector_example.py │ ├── vector_assembler_example.py │ ├── vector_indexer_example.py │ ├── vector_size_hint_example.py │ ├── vector_slicer_example.py │ └── word2vec_example.py ├── mllib │ ├── __init__.py │ ├── binary_classification_metrics_example.py │ ├── bisecting_k_means_example.py │ ├── correlations.py │ ├── correlations_example.py │ ├── decision_tree_classification_example.py │ ├── decision_tree_regression_example.py │ ├── elementwise_product_example.py │ ├── fpgrowth_example.py │ ├── gaussian_mixture_example.py │ ├── gaussian_mixture_model.py │ ├── gradient_boosting_classification_example.py │ ├── gradient_boosting_regression_example.py │ ├── hypothesis_testing_example.py │ ├── hypothesis_testing_kolmogorov_smirnov_test_example.py │ ├── isotonic_regression_example.py │ ├── k_means_example.py │ ├── kernel_density_estimation_example.py │ ├── kmeans.py │ ├── latent_dirichlet_allocation_example.py │ ├── linear_regression_with_sgd_example.py │ ├── logistic_regression.py │ ├── logistic_regression_with_lbfgs_example.py │ ├── multi_class_metrics_example.py │ ├── multi_label_metrics_example.py │ ├── naive_bayes_example.py │ ├── normalizer_example.py │ ├── pca_rowmatrix_example.py │ ├── power_iteration_clustering_example.py │ ├── random_forest_classification_example.py │ ├── random_forest_regression_example.py │ ├── random_rdd_generation.py │ ├── ranking_metrics_example.py │ ├── recommendation_example.py │ ├── regression_metrics_example.py │ ├── sampled_rdds.py │ ├── standard_scaler_example.py │ ├── stratified_sampling_example.py │ ├── streaming_k_means_example.py │ ├── streaming_linear_regression_example.py │ ├── summary_statistics_example.py │ ├── svd_example.py │ ├── svm_with_sgd_example.py │ ├── tf_idf_example.py │ ├── word2vec.py │ └── word2vec_example.py ├── pagerank.py ├── parquet_inputformat.py ├── pi.py ├── sort.py ├── sql │ ├── __init__.py │ ├── arrow.py │ ├── basic.py │ ├── datasource.py │ ├── hive.py │ └── streaming │ ├── structured_kafka_wordcount.py │ ├── structured_network_wordcount.py │ ├── structured_network_wordcount_session_window.py │ ├── structured_network_wordcount_windowed.py │ └── structured_sessionization.py ├── status_api_demo.py ├── streaming │ ├── __init__.py │ ├── hdfs_wordcount.py │ ├── network_wordcount.py │ ├── network_wordjoinsentiments.py │ ├── queue_stream.py │ ├── recoverable_network_wordcount.py │ ├── sql_network_wordcount.py │ └── stateful_network_wordcount.py ├── transitive_closure.py └── wordcount.py 8 directories, 146 files
Let us look at the classic example for Spark: word count. This is about counting words (in this case space-separated tokens) in a file.
!cat $SPARK_HOME/examples/src/main/python/wordcount.py
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import sys from operator import add from pyspark.sql import SparkSession if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: wordcount <file>", file=sys.stderr) sys.exit(-1) spark = SparkSession\ .builder\ .appName("PythonWordCount")\ .getOrCreate() lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(add) output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) spark.stop()
The app reads from an input file (sys.argv[1]
). The input should be a text file in no particular format.
There are some datasets included in the distribution that are used for other examples but no nice text file for word counting.
!tree $SPARK_HOME/data
/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/data ├── graphx │ ├── followers.txt │ └── users.txt ├── mllib │ ├── als │ │ ├── sample_movielens_ratings.txt │ │ └── test.data │ ├── gmm_data.txt │ ├── images │ │ ├── license.txt │ │ └── origin │ │ ├── kittens │ │ │ └── not-image.txt │ │ └── license.txt │ ├── kmeans_data.txt │ ├── pagerank_data.txt │ ├── pic_data.txt │ ├── ridge-data │ │ └── lpsa.data │ ├── sample_binary_classification_data.txt │ ├── sample_fpgrowth.txt │ ├── sample_isotonic_regression_libsvm_data.txt │ ├── sample_kmeans_data.txt │ ├── sample_lda_data.txt │ ├── sample_lda_libsvm_data.txt │ ├── sample_libsvm_data.txt │ ├── sample_linear_regression_data.txt │ ├── sample_movielens_data.txt │ ├── sample_multiclass_classification_data.txt │ ├── sample_svm_data.txt │ └── streaming_kmeans_data_test.txt └── streaming └── AFINN-111.txt 8 directories, 25 files
Let us download the book "Don Quixote" from the Gutenberg collection.
!wget https://www.gutenberg.org/cache/epub/996/pg996.txt -O don_quixote.txt
--2024-10-13 09:44:01-- https://www.gutenberg.org/cache/epub/996/pg996.txt Resolving www.gutenberg.org (www.gutenberg.org)... 152.19.134.47, 2610:28:3090:3000:0:bad:cafe:47 Connecting to www.gutenberg.org (www.gutenberg.org)|152.19.134.47|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 2391721 (2.3M) [text/plain] Saving to: ‘don_quixote.txt’ don_quixote.txt 100%[===================>] 2.28M 10.2MB/s in 0.2s 2024-10-13 09:44:01 (10.2 MB/s) - ‘don_quixote.txt’ saved [2391721/2391721]
Copy wordcount.py
to the current directory to avoid typing long paths.
!cp $SPARK_HOME/examples/src/main/python/wordcount.py ./
Submit the application to Spark collecting standard error to err.txt
and the output to out.txt
.
!spark-submit wordcount.py don_quixote.txt 1>out.txt 2>err.txt
Let us look at the result
The output (in out.txt
) is an unsorted list of
<word>: <frequency count>
!head out.txt
The: 846 Project: 80 Gutenberg: 23 eBook: 4 of: 12866 Don: 2541 Quixote: 1012 : 8384 This: 97 ebook: 2
!head err.txt
24/10/13 09:44:04 INFO SparkContext: Running Spark version 3.4.0 24/10/13 09:44:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/10/13 09:44:04 INFO ResourceUtils: ============================================================== 24/10/13 09:44:04 INFO ResourceUtils: No custom resources configured for spark.driver. 24/10/13 09:44:04 INFO ResourceUtils: ============================================================== 24/10/13 09:44:04 INFO SparkContext: Submitted application: PythonWordCount 24/10/13 09:44:04 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 24/10/13 09:44:04 INFO ResourceProfile: Limiting resource is cpu 24/10/13 09:44:04 INFO ResourceProfileManager: Added ResourceProfile id: 0 24/10/13 09:44:04 INFO SecurityManager: Changing view acls to: runner
Spark uses log4j for logging and valid log levels are:
"trace", "debug", "info", "warn", "error", and "fatal"
https://logging.apache.org/log4j/log4j-2.2/manual/configuration.html
There are several ways to configure the log level of your PySpark app. We are going to add a log4j2.properties
file in the conf
directory with the desired settings.
Note: Configuration templates are not included in the Pyspark distribution but they can be found at https://github.com/apache/spark/blob/master/conf. The conf
directory also needs to be created under Pyspark's home /usr/local/lib/python3.10/dist-packages/pyspark/conf
(with mkdir $(find_spark_home.py)/conf
).
log4j2.properties
file¶%%bash
# create conf directory
# with the option -p mkdir won't complain if the folder already exists
mkdir -p $SPARK_HOME/conf
# populate log4j2.properties file
FILE=$SPARK_HOME/conf/log4j2.properties
# read about heredocs: https://tldp.org/LDP/abs/html/here-docs.html
cat> $FILE <<🙂
status = warn
appender.console.type = Console
appender.console.name = STDOUT
appender.console.target = SYSTEM_ERR
rootLogger.level = warn
rootLogger.appenderRef.stdout.ref = STDOUT
# formatting
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
🙂
This is the content of the newly created log4j2.properties
file:
!cat $SPARK_HOME/conf/log4j2.properties
status = warn appender.console.type = Console appender.console.name = STDOUT appender.console.target = SYSTEM_ERR rootLogger.level = warn rootLogger.appenderRef.stdout.ref = STDOUT # formatting appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
Submit the app with the new logging configuration and show the first 10 lines of standard output and standard error.
%%bash
spark-submit wordcount.py don_quixote.txt 1>out.txt 2>err.txt
head out.txt err.txt
==> out.txt <== The: 846 Project: 80 Gutenberg: 23 eBook: 4 of: 12866 Don: 2541 Quixote: 1012 : 8384 This: 97 ebook: 2 ==> err.txt <== 2024-10-13 09:44:13 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
As you can observe, only log messages with a severity level of "warn" or higher are currently being dispatched by the logging service.
log4j2.properties.template
¶The Apache Spark distribution includes an extended log4j2.properties.template
that offers advanced flexibility in customizing logging. This template allows you to finely control logging levels tailored to specific use cases or components.
Download the log4j2.properties.template
from the Spark distribution and save it in the conf
directory.
!curl -L https://raw.githubusercontent.com/apache/spark/master/conf/log4j2.properties.template -o $SPARK_HOME/conf/log4j2.properties.template
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 2914 100 2914 0 0 11758 0 --:--:-- --:--:-- --:--:-- 11797
!ls $SPARK_HOME/conf
log4j2.properties log4j2.properties.template
Note: as long as the file has the extension template
it will be ignored by Spark. The only accepted configuration files for the logging service are:
log4j2.properties
for log4j2 (or log4j.properties
for the old log4j).
Let us look inside log4j2.properties.template
!cat $SPARK_HOME/conf/log4j2.properties.template
# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Set everything to be logged to the console rootLogger.level = info rootLogger.appenderRef.stdout.ref = console appender.console.type = Console appender.console.name = console appender.console.target = SYSTEM_ERR appender.console.layout.type = JsonTemplateLayout appender.console.layout.eventTemplateUri = classpath:org/apache/spark/SparkLayout.json # Set the default spark-shell/spark-sql log level to WARN. When running the # spark-shell/spark-sql, the log level for these classes is used to overwrite # the root logger's log level, so that the user can have different defaults # for the shell and regular Spark apps. logger.repl.name = org.apache.spark.repl.Main logger.repl.level = warn logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver logger.thriftserver.level = warn # Settings to quiet third party logs that are too verbose logger.jetty1.name = org.sparkproject.jetty logger.jetty1.level = warn logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle logger.jetty2.level = error logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper logger.replexprTyper.level = info logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter logger.replSparkILoopInterpreter.level = info logger.parquet1.name = org.apache.parquet logger.parquet1.level = error logger.parquet2.name = parquet logger.parquet2.level = error # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler logger.RetryingHMSHandler.level = fatal logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry logger.FunctionRegistry.level = error # For deploying Spark ThriftServer # SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805 appender.console.filter.1.type = RegexFilter appender.console.filter.1.regex = .*Thrift error occurred during processing of message.* appender.console.filter.1.onMatch = deny appender.console.filter.1.onMismatch = neutral
Configuring Log4j can be a non-trivial task. For detailed guidance and documentation on Log4j configuration, you can refer to the official documentation available at: Log4j Configuration.
%%bash
# sed -i changes in-place
sed -i 's/rootLogger.level = warn/rootLogger.level = debug/' $SPARK_HOME/conf/log4j2.properties
cat $SPARK_HOME/conf/log4j2.properties
status = warn appender.console.type = Console appender.console.name = STDOUT appender.console.target = SYSTEM_ERR rootLogger.level = debug rootLogger.appenderRef.stdout.ref = STDOUT # formatting appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
%%bash
spark-submit wordcount.py don_quixote.txt 1>out.txt 2>err.txt
head out.txt err.txt
==> out.txt <== The: 846 Project: 80 Gutenberg: 23 eBook: 4 of: 12866 Don: 2541 Quixote: 1012 : 8384 This: 97 ebook: 2 ==> err.txt <== 2024-10-13 09:44:23 INFO SparkContext:60 - Running Spark version 3.4.0 2024-10-13 09:44:23 DEBUG MutableMetricsFactory:43 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with annotation @org.apache.hadoop.metrics2.annotation.Metric(sampleName="Ops", always=false, valueName="Time", about="", interval=10, type=DEFAULT, value={"GetGroups"}) 2024-10-13 09:44:23 DEBUG MutableMetricsFactory:43 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(sampleName="Ops", always=false, valueName="Time", about="", interval=10, type=DEFAULT, value={"Rate of failed kerberos logins and latency (milliseconds)"}) 2024-10-13 09:44:23 DEBUG MutableMetricsFactory:43 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(sampleName="Ops", always=false, valueName="Time", about="", interval=10, type=DEFAULT, value={"Rate of successful kerberos logins and latency (milliseconds)"}) 2024-10-13 09:44:23 DEBUG MutableMetricsFactory:43 - field private org.apache.hadoop.metrics2.lib.MutableGaugeInt org.apache.hadoop.security.UserGroupInformation$UgiMetrics.renewalFailures with annotation @org.apache.hadoop.metrics2.annotation.Metric(sampleName="Ops", always=false, valueName="Time", about="", interval=10, type=DEFAULT, value={"Renewal failures since last successful login"}) 2024-10-13 09:44:23 DEBUG MutableMetricsFactory:43 - field private org.apache.hadoop.metrics2.lib.MutableGaugeLong org.apache.hadoop.security.UserGroupInformation$UgiMetrics.renewalFailuresTotal with annotation @org.apache.hadoop.metrics2.annotation.Metric(sampleName="Ops", always=false, valueName="Time", about="", interval=10, type=DEFAULT, value={"Renewal failures since startup"}) 2024-10-13 09:44:23 DEBUG MetricsSystemImpl:231 - UgiMetrics, User and group related metrics 2024-10-13 09:44:23 DEBUG SecurityUtil:124 - Setting hadoop.security.token.service.use_ip to true 2024-10-13 09:44:23 DEBUG Groups:449 - Creating new Groups object 2024-10-13 09:44:23 DEBUG NativeCodeLoader:44 - Trying to load the custom-built native-hadoop library...
Let us run the Wordcount app with Python instead of spark-submit
.
%%bash
python wordcount.py don_quixote.txt 1>out.txt 2>err.txt
head out.txt err.txt
==> out.txt <== The: 846 Project: 80 Gutenberg: 23 eBook: 4 of: 12866 Don: 2541 Quixote: 1012 : 8384 This: 97 ebook: 2 ==> err.txt <== Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2024-10-13 09:44:32 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
As you can see, the Log4j2 configuration was ignored.
The reason for that is that spark-submit
takes care of setting all the environment variables used by Spark.
spark-submit
¶spark-submit
is a shell script that calls the Java class org.apache.spark.deploy.SparkSubmit
throught the spark-class
script.
!cat $(which spark-submit)
#!/usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
Let us look at spark-class
.
!cat $SPARK_HOME/bin/spark-class
#!/usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi . "${SPARK_HOME}"/bin/load-spark-env.sh # Find the java binary if [ -n "${JAVA_HOME}" ]; then RUNNER="${JAVA_HOME}/bin/java" else if [ "$(command -v java)" ]; then RUNNER="java" else echo "JAVA_HOME is not set" >&2 exit 1 fi fi # Find Spark jars. if [ -d "${SPARK_HOME}/jars" ]; then SPARK_JARS_DIR="${SPARK_HOME}/jars" else SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars" fi if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2 echo "You need to build Spark with the target \"package\" before running this program." 1>&2 exit 1 else LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*" fi # Add the launcher build dir to the classpath if requested. if [ -n "$SPARK_PREPEND_CLASSES" ]; then LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH" fi # For tests if [[ -n "$SPARK_TESTING" ]]; then unset YARN_CONF_DIR unset HADOOP_CONF_DIR fi # The launcher library will print arguments separated by a NULL character, to allow arguments with # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating # an array that will be used to exec the final command. # # The exit code of the launcher is appended to the output, so the parent shell removes it from the # command array and checks the value to see if the launcher succeeded. build_command() { "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@" printf "%d\0" $? } # Turn off posix mode since it does not allow process substitution set +o posix CMD=() DELIM=$'\n' CMD_START_FLAG="false" while IFS= read -d "$DELIM" -r _ARG; do ARG=${_ARG//$'\r'} if [ "$CMD_START_FLAG" == "true" ]; then CMD+=("$ARG") else if [ "$ARG" == $'\0' ]; then # After NULL character is consumed, change the delimiter and consume command string. DELIM='' CMD_START_FLAG="true" elif [ "$ARG" != "" ]; then echo "$ARG" fi fi done < <(build_command "$@") COUNT=${#CMD[@]} LAST=$((COUNT - 1)) LAUNCHER_EXIT_CODE=${CMD[$LAST]} # Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes # the code that parses the output of the launcher to get confused. In those cases, check if the # exit code is an integer, and if it's not, handle it as a special error case. if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then echo "${CMD[@]}" | head -n-1 1>&2 exit 1 fi if [ $LAUNCHER_EXIT_CODE != 0 ]; then exit $LAUNCHER_EXIT_CODE fi CMD=("${CMD[@]:0:$LAST}") exec "${CMD[@]}"
spark-submit
¶Your app will run as a regular Python script but if you want to be able to configure how it interacts with the Spark engine is recommended to use spark-submit
.
pyspark
shell?¶PySpark provides an interactive shell where you can run your PySpark code. The shell takes care of creating a Spark context (named sc
) and a Spark session (named spark
).
Running a PySpark shell is useful for quick experiments on the command-line as it provides an interactive "REPL" but it does not make much sense on Google Colab where one already has another type of interactivity provided by the underlying Jupyter notebook.
pyspark
shell¶If you start the Pyspark shell in a Colab notebook cell, you can input your commands (one line at once) in the interactive box by: and hitting return
We are going to input the following lines in the PySpark shell:
from operator import add
lines = spark.read.text('don_quixote.txt').rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)) .reduceByKey(add)
output = counts.collect()
for (word, count) in output[:10]: # print just first 10 lines of output
print("%s: %i" % (word, count))
To exit the PySpark shell either enter
quit()
or interrupt the cell execution.
# uncomment the next line for running the interactive PySpark shell
#!pyspark
If you want to run PySpark interactively in a Jupyter environment, you can install the PySpark kernel.
My unpretentious prompt for ChatGPT:
“Write something about real-time versus batch jobs in the context of Hadoop Spark. Big Data jobs are inherently batch jobs because they run more longer than the time required for meaningful interaction. On the other hand, people want to interact with the Spark interactively (jupyter notebooks, pyspark shell) so there's this "clash of cultures" between real-time vs. interactive”
To which ChatGPT 3.5 replied:
# @title
from IPython.core.display import HTML
HTML("""
<div style="background-color:rgb(16, 163, 127,.2);border:2px solid rgb(16, 163, 127,.3);padding:3px;">
<svg fill="none" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 320 320" style="width:32px;height:32px;">
<g fill="currentColor">
<path d="m297.06 130.97c7.26-21.79 4.76-45.66-6.85-65.48-17.46-30.4-52.56-46.04-86.84-38.68-15.25-17.18-37.16-26.95-60.13-26.81-35.04-.08-66.13 22.48-76.91 55.82-22.51 4.61-41.94 18.7-53.31 38.67-17.59 30.32-13.58 68.54 9.92 94.54-7.26 21.79-4.76 45.66 6.85 65.48 17.46 30.4 52.56 46.04 86.84 38.68 15.24 17.18 37.16 26.95 60.13 26.8 35.06.09 66.16-22.49 76.94-55.86 22.51-4.61 41.94-18.7 53.31-38.67 17.57-30.32 13.55-68.51-9.94-94.51zm-120.28 168.11c-14.03.02-27.62-4.89-38.39-13.88.49-.26 1.34-.73 1.89-1.07l63.72-36.8c3.26-1.85 5.26-5.32 5.24-9.07v-89.83l26.93 15.55c.29.14.48.42.52.74v74.39c-.04 33.08-26.83 59.9-59.91 59.97zm-128.84-55.03c-7.03-12.14-9.56-26.37-7.15-40.18.47.28 1.3.79 1.89 1.13l63.72 36.8c3.23 1.89 7.23 1.89 10.47 0l77.79-44.92v31.1c.02.32-.13.63-.38.83l-64.41 37.19c-28.69 16.52-65.33 6.7-81.92-21.95zm-16.77-139.09c7-12.16 18.05-21.46 31.21-26.29 0 .55-.03 1.52-.03 2.2v73.61c-.02 3.74 1.98 7.21 5.23 9.06l77.79 44.91-26.93 15.55c-.27.18-.61.21-.91.08l-64.42-37.22c-28.63-16.58-38.45-53.21-21.95-81.89zm221.26 51.49-77.79-44.92 26.93-15.54c.27-.18.61-.21.91-.08l64.42 37.19c28.68 16.57 38.51 53.26 21.94 81.94-7.01 12.14-18.05 21.44-31.2 26.28v-75.81c.03-3.74-1.96-7.2-5.2-9.06zm26.8-40.34c-.47-.29-1.3-.79-1.89-1.13l-63.72-36.8c-3.23-1.89-7.23-1.89-10.47 0l-77.79 44.92v-31.1c-.02-.32.13-.63.38-.83l64.41-37.16c28.69-16.55 65.37-6.7 81.91 22 6.99 12.12 9.52 26.31 7.15 40.1zm-168.51 55.43-26.94-15.55c-.29-.14-.48-.42-.52-.74v-74.39c.02-33.12 26.89-59.96 60.01-59.94 14.01 0 27.57 4.92 38.34 13.88-.49.26-1.33.73-1.89 1.07l-63.72 36.8c-3.26 1.85-5.26 5.31-5.24 9.06l-.04 89.79zm14.63-31.54 34.65-20.01 34.65 20v40.01l-34.65 20-34.65-20z"></path>
</svg>
Real-time versus batch processing is a fundamental consideration in the context of Hadoop and Spark, especially when dealing with Big Data. These two paradigms have different use cases and approaches, and they often come into conflict when it comes to balancing the need for real-time responsiveness and interactive exploration of data.
<h3>Batch Processing:</h3>
<p>
Batch processing is the traditional way of handling data in large-scale analytics and data processing tasks. In this approach, data is collected, stored, and processed in discrete chunks or batches. Hadoop MapReduce, for example, is a batch processing framework. In this paradigm, data is collected over a period, and then a job is triggered to process the accumulated data in one go.
<p>
Advantages of batch processing in the context of Big Data include:
<p>
<ol>
<li><h4>Efficiency:</h4> Batch processing is highly efficient for processing large volumes of data, as it allows for optimizations and resource allocation over the entire dataset.
<li><h4>Scalability:</h4> Batch jobs can be scaled horizontally to handle massive amounts of data by adding more computing resources.
<li><h4>Robustness:</h4> Batch jobs are fault-tolerant; if a node fails during processing, the job can be re-executed without loss of data.
</ol>
<p>
However, there are some limitations with batch processing, particularly in the context of Spark:
<ul>
<li></h4>Latency: Batch processing is not suitable for real-time or interactive scenarios. It takes time to accumulate and process data, making it unsuitable for applications where low-latency responses are required.
<li></h4>Iterative Analysis: In interactive data exploration and iterative analysis, users need quick feedback, which is not feasible in a batch processing paradigm.
</ul>
<h3>Real-time Processing:</h3>
Real-time processing, on the other hand, aims to provide immediate responses to data as it arrives. Apache Spark Streaming is an example of a real-time processing framework. In this paradigm, data is processed incrementally as it's ingested, allowing for low-latency responses and real-time analytics.
<p>
Advantages of real-time processing include:
<p>
<ol>
<li><h4>Low Latency:</h4> Real-time processing is suitable for applications where low-latency responses are essential, such as fraud detection, recommendation systems, and monitoring.
<li><h4>Interactive Exploration:</h4> It enables interactive exploration of data, making it suitable for use in Jupyter notebooks or Spark shells where data scientists and analysts want to explore data interactively.
</ol>
<p>
However, there are trade-offs with real-time processing:
<ul>
<li><h4>Scalability and Efficiency:</h4> Real-time processing may be less efficient for processing large historical datasets. It focuses on real-time events and may not be the best choice for big, batch-oriented data.
<li><h4>Complexity:</h4> Real-time systems are often more complex to set up and manage compared to batch processing.
</ul>
The "clash of cultures" between real-time and interactive data processing in the context of Spark can be mitigated by using hybrid approaches. For example, you can use Spark Streaming for real-time processing and periodically store processed data for batch analysis. This allows you to balance low-latency interactions with the need for in-depth batch analysis.
<p>
In summary, the choice between real-time and batch processing in the context of Hadoop Spark depends on your specific use case and requirements. While Big Data jobs may often lean towards batch processing due to their data volume, it's crucial to consider the need for real-time responsiveness and interactive exploration and find a suitable balance between the two paradigms to meet the demands of modern data analytics.
</div>
""")
Batch processing is the traditional way of handling data in large-scale analytics and data processing tasks. In this approach, data is collected, stored, and processed in discrete chunks or batches. Hadoop MapReduce, for example, is a batch processing framework. In this paradigm, data is collected over a period, and then a job is triggered to process the accumulated data in one go.
Advantages of batch processing in the context of Big Data include:
However, there are some limitations with batch processing, particularly in the context of Spark:
Advantages of real-time processing include:
However, there are trade-offs with real-time processing:
In summary, the choice between real-time and batch processing in the context of Hadoop Spark depends on your specific use case and requirements. While Big Data jobs may often lean towards batch processing due to their data volume, it's crucial to consider the need for real-time responsiveness and interactive exploration and find a suitable balance between the two paradigms to meet the demands of modern data analytics.
Me:
“One should keep in mind that basically all Hadoop Spark jobs are batch jobs. Interactive tools make sense for small jobs with a limited amount of data. Another topic is interactive monitoring of jobs (Web UIs).”
# @title
from IPython.core.display import HTML
HTML("""
<div style="background-color:rgb(16, 163, 127,.2);border:2px solid rgb(16, 163, 127,.3);padding:3px;">
<svg fill="none" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 320 320" style="width:32px;height:32px;">
<g fill="currentColor">
<path d="m297.06 130.97c7.26-21.79 4.76-45.66-6.85-65.48-17.46-30.4-52.56-46.04-86.84-38.68-15.25-17.18-37.16-26.95-60.13-26.81-35.04-.08-66.13 22.48-76.91 55.82-22.51 4.61-41.94 18.7-53.31 38.67-17.59 30.32-13.58 68.54 9.92 94.54-7.26 21.79-4.76 45.66 6.85 65.48 17.46 30.4 52.56 46.04 86.84 38.68 15.24 17.18 37.16 26.95 60.13 26.8 35.06.09 66.16-22.49 76.94-55.86 22.51-4.61 41.94-18.7 53.31-38.67 17.57-30.32 13.55-68.51-9.94-94.51zm-120.28 168.11c-14.03.02-27.62-4.89-38.39-13.88.49-.26 1.34-.73 1.89-1.07l63.72-36.8c3.26-1.85 5.26-5.32 5.24-9.07v-89.83l26.93 15.55c.29.14.48.42.52.74v74.39c-.04 33.08-26.83 59.9-59.91 59.97zm-128.84-55.03c-7.03-12.14-9.56-26.37-7.15-40.18.47.28 1.3.79 1.89 1.13l63.72 36.8c3.23 1.89 7.23 1.89 10.47 0l77.79-44.92v31.1c.02.32-.13.63-.38.83l-64.41 37.19c-28.69 16.52-65.33 6.7-81.92-21.95zm-16.77-139.09c7-12.16 18.05-21.46 31.21-26.29 0 .55-.03 1.52-.03 2.2v73.61c-.02 3.74 1.98 7.21 5.23 9.06l77.79 44.91-26.93 15.55c-.27.18-.61.21-.91.08l-64.42-37.22c-28.63-16.58-38.45-53.21-21.95-81.89zm221.26 51.49-77.79-44.92 26.93-15.54c.27-.18.61-.21.91-.08l64.42 37.19c28.68 16.57 38.51 53.26 21.94 81.94-7.01 12.14-18.05 21.44-31.2 26.28v-75.81c.03-3.74-1.96-7.2-5.2-9.06zm26.8-40.34c-.47-.29-1.3-.79-1.89-1.13l-63.72-36.8c-3.23-1.89-7.23-1.89-10.47 0l-77.79 44.92v-31.1c-.02-.32.13-.63.38-.83l64.41-37.16c28.69-16.55 65.37-6.7 81.91 22 6.99 12.12 9.52 26.31 7.15 40.1zm-168.51 55.43-26.94-15.55c-.29-.14-.48-.42-.52-.74v-74.39c.02-33.12 26.89-59.96 60.01-59.94 14.01 0 27.57 4.92 38.34 13.88-.49.26-1.33.73-1.89 1.07l-63.72 36.8c-3.26 1.85-5.26 5.31-5.24 9.06l-.04 89.79zm14.63-31.54 34.65-20.01 34.65 20v40.01l-34.65 20-34.65-20z"></path>
</svg>
You make a valid point that the fundamental nature of Hadoop Spark jobs is indeed batch processing. While Spark can process data in near real-time using components like Spark Streaming and Structured Streaming, the core Spark processing engine itself operates on data in batches. This batch-oriented nature is rooted in Spark's lineage to the Hadoop MapReduce framework.
<p>
Interactive tools, such as Jupyter notebooks and Spark shells, are most effective for exploratory data analysis and iterative development when working with relatively small subsets of data. These tools allow data scientists and analysts to interactively explore and manipulate data without the need for long-running batch jobs. However, they might not be well-suited for large-scale data processing and analysis.
<p>
Interactive monitoring of Spark jobs is essential for understanding how batch jobs are progressing and diagnosing any issues that may arise during their execution. The Spark Web UIs provide valuable insights into the status of Spark applications, jobs, stages, and tasks. These monitoring tools help users track the progress of their batch jobs, identify bottlenecks, optimize resource utilization, and troubleshoot performance problems.
<p>
Some key aspects of interactive monitoring in Spark include:
<ol>
<li><h4>Spark Application UI:</h4>
This web interface provides an overview of the entire Spark application, including information on job progress, task details, and resource utilization.
<li><h4>Spark Job and Stage UIs:</h4>
These interfaces allow users to drill down into specific jobs and stages within an application, providing details on task execution and performance metrics.
<li><h4>Streaming UI:</h4>
When working with Spark Streaming or Structured Streaming, this UI provides insights into real-time data processing, showing statistics on processing rates and latencies.
<li><h4>Event Logs:</h4>
Spark event logs can be invaluable for post-mortem analysis, and they can be accessed through the Web UIs.
</ol>
Interactive monitoring is crucial for maintaining the health and performance of Spark applications, especially in larger, more complex batch jobs. It allows users to identify issues and bottlenecks as they arise and make adjustments or optimizations in real-time.
<p>
In summary, it's important to recognize that while Spark jobs are primarily batch-oriented, interactive tools and monitoring capabilities play a significant role in facilitating the development, debugging, and exploration of data. The choice between batch and real-time processing should be based on the specific requirements of your use case, while also considering the need for interactive tools and monitoring to effectively manage and troubleshoot Spark applications.
</div>
""")
Interactive tools, such as Jupyter notebooks and Spark shells, are most effective for exploratory data analysis and iterative development when working with relatively small subsets of data. These tools allow data scientists and analysts to interactively explore and manipulate data without the need for long-running batch jobs. However, they might not be well-suited for large-scale data processing and analysis.
Interactive monitoring of Spark jobs is essential for understanding how batch jobs are progressing and diagnosing any issues that may arise during their execution. The Spark Web UIs provide valuable insights into the status of Spark applications, jobs, stages, and tasks. These monitoring tools help users track the progress of their batch jobs, identify bottlenecks, optimize resource utilization, and troubleshoot performance problems.
Some key aspects of interactive monitoring in Spark include:
In summary, it's important to recognize that while Spark jobs are primarily batch-oriented, interactive tools and monitoring capabilities play a significant role in facilitating the development, debugging, and exploration of data. The choice between batch and real-time processing should be based on the specific requirements of your use case, while also considering the need for interactive tools and monitoring to effectively manage and troubleshoot Spark applications.
if IN_COLAB:
from google.colab import output
output.serve_kernel_port_as_window(4040, path='/jobs/index.html')
The Web UI is only available while the job is running so if you click on the link above now you will get an error message.
Start a PySpark job and reload the page. We are going to start the pi.py
example with $100$ iterations.
But first set the log level to "warn".
%%bash
# sed -i changes in-place
sed -i 's/rootLogger.level =.*/rootLogger.level = warn/' $SPARK_HOME/conf/log4j2.properties
cat $SPARK_HOME/conf/log4j2.properties
status = warn appender.console.type = Console appender.console.name = STDOUT appender.console.target = SYSTEM_ERR rootLogger.level = warn rootLogger.appenderRef.stdout.ref = STDOUT # formatting appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
!cp $SPARK_HOME/examples/src/main/python/pi.py ./
!spark-submit pi.py 100
2024-10-13 09:44:41 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Pi is roughly 3.154120
if IN_COLAB:
output.serve_kernel_port_as_window(4040, path='/jobs/index.html')
Now in the Web UI you can visualize the progress of your job (but only while it's running!).
The History server is useful for reviewing jobs after they are completed.
The server can be started with the script start-history-server.sh
located in the sbin
directory of the PySpark installation folder (see also Viewing After the Fact in the Apache Spark documentation).
The History Server requires the property spark.eventLog.enabled
in Spark set to true
and spark.eventLog.dir
to be the file where event logs are to be found (by default the regular file file:/tmp/spark-events
).
The History Server runs as a background process thus when started it will not block the whole notebook.
!$SPARK_HOME/sbin/start-history-server.sh |tee start_history_server.log
starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/logs/spark-runner-org.apache.spark.deploy.history.HistoryServer-1-fv-az837-567.out
Extract the History Server's log filename and look at the History Server's log.
%%bash --no-raise-error
HISTORY_SERVER_LOG=$(cut -d' ' -f5 start_history_server.log)
echo $HISTORY_SERVER_LOG
cat $HISTORY_SERVER_LOG
/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/logs/spark-runner-org.apache.spark.deploy.history.HistoryServer-1-fv-az837-567.out Spark Command: /usr/lib/jvm/temurin-11-jdk-amd64/bin/java -cp /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/conf/:/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer ======================================== 2024-10-13 09:44:51 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
You should see a line that looks like this:
Bound HistoryServer to 0.0.0.0, and started at http://eb8ea65339ee:18080
This means that the History Server can be reached at port $18080$ from the local host.
Let us serve port 18080 using Google Colab's output
library:
if IN_COLAB:
p = 18080
output.serve_kernel_port_as_window(p)
/tmp/spark-events
¶The log file also contain the line:
Exception in thread "main" java.io.FileNotFoundException: Log directory specified does not exist: file:/tmp/spark-events
This prevented the server from starting, that's why the above link https://localhost:18080/
has no content.
Since the missing folder /tmp/spark-events
caused the History Server to crash, let us create it.
!mkdir /tmp/spark-events
mkdir: cannot create directory ‘/tmp/spark-events’: File exists
And now restart the server.
%%bash --no-raise-error
# first stop it in case it's already running
$SPARK_HOME/sbin/stop-history-server.sh
$SPARK_HOME/sbin/start-history-server.sh |tee start_history_server.log
HISTORY_SERVER_LOG=$(cat start_history_server.log |awk '{print $NF}')
echo "History Server log:" $HISTORY_SERVER_LOG
cat $HISTORY_SERVER_LOG
stopping org.apache.spark.deploy.history.HistoryServer starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/logs/spark-runner-org.apache.spark.deploy.history.HistoryServer-1-fv-az837-567.out History Server log: /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/logs/spark-runner-org.apache.spark.deploy.history.HistoryServer-1-fv-az837-567.out Spark Command: /usr/lib/jvm/temurin-11-jdk-amd64/bin/java -cp /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/conf/:/opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer ======================================== 2024-10-13 09:44:54 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Link for the History Server Web UI:
if IN_COLAB:
output.serve_kernel_port_as_window(18080)
This looks better now. If you click on the link https://localhost:18080/
now (find the link above) you should get something like:
Let us submit an app to fill Spark's events directory.
%%bash
spark-submit wordcount.py don_quixote.txt 1>out.txt 2>err.txt
head out.txt err.txt
==> out.txt <== The: 846 Project: 80 Gutenberg: 23 eBook: 4 of: 12866 Don: 2541 Quixote: 1012 : 8384 This: 97 ebook: 2 ==> err.txt <== 2024-10-13 09:44:57 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
There are still a couple of adjustments to be made in order to be able to see Spark's applications history.
The following two lined need to be added to spark-defaults.conf
(see Viewing After the Fact):
spark.eventLog.enabled true
spark.eventLog.dir file:/tmp/spark-events
%%bash
# create conf directory
# with the option -p mkdir won't complain if the folder already exists
mkdir -p $SPARK_HOME/conf
# populate log4j2.properties file
FILE=$SPARK_HOME/conf/spark-defaults.conf
# see https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact
cat> $FILE <<🙂
spark.eventLog.enabled true
spark.eventLog.dir file:/tmp/spark-events
🙂
Restart the server and re-run the app. The app should now write logs to the directory (file:/tmp/spark-events
) that's being read by the History Server.
%%bash --no-raise-error
# first stop it in case it's already running
$SPARK_HOME/sbin/stop-history-server.sh
$SPARK_HOME/sbin/start-history-server.sh |tee start_history_server.log
HISTORY_SERVER_LOG=$(cat start_history_server.log |awk '{print $NF}')
echo "History Server log:" $HISTORY_SERVER_LOG
spark-submit wordcount.py don_quixote.txt 1>out.txt 2>err.txt
head out.txt err.txt
stopping org.apache.spark.deploy.history.HistoryServer starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/logs/spark-runner-org.apache.spark.deploy.history.HistoryServer-1-fv-az837-567.out History Server log: /opt/hostedtoolcache/Python/3.8.18/x64/lib/python3.8/site-packages/pyspark/logs/spark-runner-org.apache.spark.deploy.history.HistoryServer-1-fv-az837-567.out ==> out.txt <== The: 846 Project: 80 Gutenberg: 23 eBook: 4 of: 12866 Don: 2541 Quixote: 1012 : 8384 This: 97 ebook: 2 ==> err.txt <== 2024-10-13 09:45:09 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In the browser tab or window where the History Server is open you should now see (after reloading the page)
If you click on the App ID you get more details on the Wordcount app (you might have to click on the link twice if an error message shows up)
You can look at the steps of your app represented as a Directed Acyclic Graph (DAG)
The "Environment" tab contains all the environment details of your app.
This will allow you to re-run the whole notebook from the beginning starting from a clean PySpark installation (without Log4j2 configuration files and without a running History Server).
conf
folder¶In order to reset the Colab environment to the initial state while keeping the PySpark installation, we are going to remove the conf
folder.
!rm -rf $SPARK_HOME/conf
!$SPARK_HOME/sbin/stop-history-server.sh
stopping org.apache.spark.deploy.history.HistoryServer