By the end of the lab, you will have learned how to configure Databricks, work with storage accounts, process data using Scala, store processed data in Delta Lake, and visualize the data in Power BI.
In this lab, we'll cover the following recipes:
In this recipe, we’ll learn how to configure the Azure Databricks environment by creating an Azure Databricks workspace, cluster, and cluster pools.
An Azure Databricks workspace is the starting point for writing solutions in Azure Databricks. A workspace is where you create clusters, write notebooks, schedule jobs, and manage the Azure Databricks environment.
An Azure Databricks workspace can be created in an Azure-managed virtual network or customer-managed virtual network. In this recipe, we will create a Databricks cluster in an Azure-managed network. Let’s get started:
NOTE
Cluster types: There are two types of cluster - Interactive and Automated. Interactive clusters are created manually by users so that they can interactively analyze the data while working on, or even developing, a data engineering solution. Automated clusters are created automatically when a job starts and are terminated as and when the job completes.
Cluster mode: There are two major cluster modes - Standard and High Concurrency. Standard cluster mode uses single-user clusters, optimized to run tasks one at a time. The High Concurrency cluster mode is optimized to run multiple tasks in parallel; however, it only supports R, Python, and SQL workloads, and doesn’t support Scala.
Cluster notes: There are two types of cluster nodes - Worker type and Driver type. The Driver type node is responsible for maintaining a notebook’s state information, interpreting the commands being run from a notebook or a library, and co-ordinates with Spark executors. The Worker type nodes are the Spark executor nodes, which are responsible for distributed data processing.
Perform the following steps to create a Databricks cluster:
Azure Databricks pools reduce cluster startup and autoscaling time by keeping a set of idle, ready-to-use instances without the need for creating instances when required. To create Azure Databricks pools, execute the following steps:
We can add multiple clusters to a pool. Whenever an instance, such as dbcluster01, requires an instance, it’ll attempt to allocate the pool’s idle instance. If an idle instance isn’t available, the pool expands to get new instances, as long as the number of instances is under the maximum capacity.
Azure Key Vault is a useful service for storing keys and secrets that are used by various other services and applications. It is important to integrate Azure Key Vault with Databricks, as you could store the credentials of objects such as a SQL database or data lake inside the key vault. Once integrated, Databricks can reference the key vault, obtain the credentials, and access the database or data lake account. In this recipe, we will cover how you can integrate Databricks with Azure Key Vault.
You will receive confirmation that a secret scope called datalakekey has been successfully added. This completes the integration between Databricks and Azure Key Vault.
How it works…
Upon completion of the preceding steps, all users with access to the Databricks workspace will be able to extract secrets and keys from the key vault and use them in a Databricks notebook to perform the desired operations.
Behind the scenes, Databricks uses a service principal to access the key vault. As we create the scope in the Databricks portal, Azure will grant the relevant permissions required for the Databricks service principal on the key vault. You can verify as much using the following steps:
Accessing data from Azure Data Lake is one of the fundamental steps of performing data processing in Databricks. In this recipe, we will learn how to mount an Azure Data Lake container in Databricks using the Databricks service principal. We will use Azure Key Vault to store the Databricks service principal ID and the Databricks service principal secret that will be used to mount a data lake container in Databricks.
Mounting the container in Databricks will involve the following steps:
SparshDatabricks
and click on the Register button.Storage Blob Data Contributor
role, select it, and click Next.SparshDatabricks
.The code extracts the application ID, application secret, and directory ID from the key vault using the dbutils.secrets.get function available in Scala. The dbutils.secret.get function takes the scope name (provided in the Integrating Databricks with Azure Key Vault recipe), and the secret names provided in step 7 and step 8. The dbutils.fs.mount command has a parameter called source, which takes the URL of the data lake container to be mounted. The data lake container URL format is abfss://abfss://databricks@sparshstorage1.dfs.core.windows.net/
:
val appsecret = dbutils.secrets.get(scope="datalakekey",key="appsecret")
val ApplicationID = dbutils.secrets.get(scope="datalakekey",key="ApplicationID")
val DirectoryID = dbutils.secrets.get(scope="datalakekey",key="DirectoryID")
val endpoint = "https://login.microsoftonline.com/" + DirectoryID + "/oauth2/token"
val configs = Map(
"fs.azure.account.auth.type" -> "OAuth",
"fs.azure.account.oauth.provider.type" -> "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id" -> ApplicationID,
"fs.azure.account.oauth2.client.secret" -> appsecret,
"fs.azure.account.oauth2.client.endpoint" -> endpoint)
// Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
source = "abfss://databricks@sparshstorage1.dfs.core.windows.net/",
mountPoint = "/mnt/datalakestorage",
extraConfigs = configs)
Upon running the script, the data lake container will be successfully mounted:
How it works...
On Azure AD, we registered an application that created a service principal. Service principals are entities that applications can use to authenticate themselves to Azure services. We provided permissions for the application ID on the container to be accessed, which grants permission to the service principal created. We stored the credentials of the service principal (the application ID and secret) in Azure Key Vault to ensure secure access to credentials. Databricks obtains the service principal credentials (the application ID and secret) from the key vault and uses the security context of the service principal to access the Azure Data Lake account. Databricks, while mounting the data lake account, retrieves the application ID, directory ID, and secret from the key vault and uses the service principal context to access the Azure Data Lake account. This process makes for a very secure method of accessing a data lake account for the following reasons:
Databricks notebooks are the fundamental component in Databricks for performing data processing tasks. In this recipe, we will perform operations such as reading, filtering, cleaning a Comma-Separated Value (CSV) file, and gaining insights from it using a Databricks notebook written in Scala code.
/FileStore/shared_uploads/<loginname>
.val covid_raw_data = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/FileStore/shared_uploads/sprsag@gmail.com/covid_data.csv")
display(covid_raw_data)
covid_raw_data.count()
val covid_remove_duplicates = covid_raw_data.dropDuplicates()
covid_remove_duplicates.printSchema()
val covid_selected_columns = covid_remove_duplicates.select("iso_code","location","continent","date","new_deaths_per_million","people_fully_vaccinated","population")
val covid_clean_data = covid_selected_columns.na.drop()
covid_clean_data.count()
covid_clean_data.createOrReplaceTempView("covid_view")
%sql
SELECT iso_code, location, continent,
SUM(new_deaths_per_million) as death_sum,
MAX(people_fully_vaccinated * 100 / population) as percentage_vaccinated FROM covid_view
WHERE population > 1000000
GROUP BY iso_code,location,continent
ORDER BY death_sum desc
NOTE: Notebooks for this recipe is in ./assets
folder.
How it works…
DataFrames are the fundamental objects used to store runtime data during data processing in Databricks. DataFrames are in-memory objects and extremely well-optimized for performing advanced analytics operations.
A CSV file was loaded to the Databricks File System (DBFS) storage, which is the default local storage available when a Databricks workspace is created. We can perform the same activities in a data lake account too, by uploading the CSV file to the data lake container and mounting the data lake container, as explained in the Mounting an Azure Data Lake container in Databricks recipe.
After loading the data to a DataFrame, we were able to cleanse the data by performing operations such as removing unwanted columns, dropping duplicates, and deleting rows with NULL values easily using Spark functions. Finally, by creating a temporary view out of the DataFrame, we were able to analyze the DataFrame’s data using SQL queries and get visual insights using Databricks' visualization capabilities.
Data processing can be performed using notebooks, but to operationalize it, we need to execute it at a specific scheduled time, depending upon the demands of the use case or problem statement. After a notebook has been created, you can schedule a notebook to be executed at a preferred frequency using job clusters. This recipe will demonstrate how you could schedule a notebook using job clusters.
SampleJob
. Select the imported SampleJob notebook. On the New Cluster configuration, click on the edit icon to set the configuration options. The job will create a cluster each time it runs based on the configuration and delete the cluster once the job is completed.How it works…
The imported notebook was set to run at a specific schedule using the Databricks job scheduling capabilities. While scheduling the jobs, New Cluster was selected, instead of picking any cluster available in the workspace. Picking New Cluster implies a cluster will be created each time the job runs and will be destroyed once the job completes. This also means the jobs need to wait for an additional 2 minutes for the cluster to be created for each run.
Adding multiple notebooks to the same job via additional tasks allows us to reuse the job cluster created for the first notebook execution, and the second task needn’t wait for another cluster to be created. Usage of multiple tasks and the dependency option allows us to orchestrate complex data processing flows using Databricks notebooks.
Delta Lake databases are Atomicity, Consistency, Isolation, and Durability (ACID) property-compliant databases available in Databricks. Delta Lake tables are tables in Delta Lake databases that use Parquet files to store data and are highly optimized for performing analytic operations. Delta Lake tables can be used in a data processing notebook for storing preprocessed or processed data. The data stored in Delta Lake tables can be easily consumed in visualization tools such as Power BI.
In this recipe, we will create a Delta Lake database and Delta Lake table, load data from a CSV file, and perform additional operations such as UPDATE, DELETE, and MERGE on the table.
From the Databricks menu, click Create and create a new notebook.
Create a notebook called Covid-DeltaTables with Default Language set to SQL.
Add a cell to the notebook and execute the following command to create a Delta database called covid:
CREATE DATABASE covid
Execute the following command to read the covid-data.csv file to a temporary view. Please note that the path depends on the location where covid-data.csv was uploaded and ensure to provide the correct path for your environment:
CREATE TEMPORARY VIEW covid_data
USING CSV
OPTIONS (path "/FileStore/shared_uploads/sprsag@gmail.com/covid_data.csv", header "true", mode "FAILFAST")
Execute the following command to create a Delta table using the temporary view. USING DELTA in the CREATE TABLE command indicates that it’s a Delta table being created. The location specifies where the table is stored. If you have mounted a data lake container (as we did in the Mounting an Azure Data Lake container in Databricks recipe), you can use the data lake mount point to store the Delta Lake table in your Azure Data Lake account. In this example, we use the default storage provided by Databricks, which comes with each Databricks workspace. The table name is provided in Go to the Databricks menu, click Data, and then click on the covid database. You will notice that a covid_data_delta table has been created. Go back to the notebook we were working on. Add a new cell and delete some rows using the following command. The DELETE command will delete around 57,000 rows. We can delete, select, and insert data as well as we would in any other commercial database: Add a new cell and execute the following command to delete all the rows from the table. Let’s run a select count(*) query against the table, which will return 0 if all the rows have been deleted: Delta tables have the ability to time travel, which allows us to read older versions of the table. Using the as of version RESTORE TABLE can restore the table to the older version. Add a cell and execute the following command to recover all the rows before deletion: Let’s perform an UPDATE statement, followed by a DELETE statement. Add two cells. Paste the following commands into these cells, as shown. Execute them in sequence: If we want to revert these two operations (UPDATE and DELETE), we can use the MERGE statement and the Delta table’s row versioning capabilities to achieve this. Using a single MERGE statement, we can perform the following: This can be achieved using the following code: The MERGE command takes the covid_data_delta table as the source table to be updated or inserted. Instead of specifying version numbers, we can also use timestamps to obtain older versions of the table. covid_data_delta TIMESTAMP AS OF "2023-02-11 11:12:00" takes the version of the table as of February 11, 2023, 11:12:00 – UTC time. WHEN MATCHED THEN UPDATE SET * updates all the columns in the table when the condition specified in the ON clause matches. When the condition doesn’t match, the rows are inserted from the older version to the current version of the table. The output, as expected, shows that the rows that were deleted in earlier step were successfully reinserted. How it works… Delta tables offer advanced capabilities for processing data, such as support for UPDATE, DELETE, and MERGE statements. MERGE statements and the versioning capabilities of Delta tables are very powerful in ETL scenarios, where we need to perform UPSERT (update if it matches, insert if it doesn’t) operations against various tables. These capabilities for supporting data modifications and row versioning are made possible because Delta tables maintain the changes to the table via a transaction log file stored in JSON format. The transaction files are located in the same location where the table was created but in a subfolder called _delta_log. By default, the log files are retained for 30 days and can be controlled using the delta.logRetentionDuration table property. The ability to read older versions is also controlled by the delta.logRetentionDuration property. to ensure it belongs to the Delta Lake database created. To insert the data from the view into the new table, the table is created using the CREATE TABLE command, followed by the AS command, followed by the SELECT statement against the view:
CREATE OR REPLACE TABLE covid.covid_data_delta
USING DELTA
LOCATION '/FileStore/shared_uploads/sprsag@gmail.com/covid_data_delta'
AS
SELECT iso_code,location,continent,date,new_deaths_per_million,people_fully_vaccinated,population FROM covid_data
DELETE FROM covid.covid_data_delta where population is null or people_fully_vaccinated is null or new_deaths_per_million is null or location is null
delete from covid.covid_data_delta;
Select count(*) from covid.covid_data_delta;
select * from covid.covid_data_delta version as of 0;
RESTORE TABLE covid.covid_data_delta TO VERSION AS OF 0;
UPDATE covid.covid_data_delta SET population = population * 1.2 WHERE continent = 'Asia';
DELETE FROM covid.covid_data_delta WHERE continent = 'Europe';
MERGE INTO covid.covid_data_delta source
USING covid.covid_data_delta TIMESTAMP AS OF "2023-02-11 11:12:00" target
ON source.location = target.location and source.date = target.date
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *