In this lab, we will be performing the following:
By the end of the lab, you will have learned how to copy data to Parquet files using data flows, perform data transformation using data flows, build dynamic and resilient data flows using parameterization and schema drifting, and monitor data flows and pipelines.
In this recipe, we will convert a CSV file into the Parquet format using a Synapse data flow. We will be performing the following tasks to achieve this:
To get started, do the following:
transactiontablet1
. Set Linked service to sparshadesynapse-WorkspaceDefaultStorage. Set File path to synapse/csv/transaction_table-t1.csv, the location to which we uploaded the CSV file. Check the First row as header checkbox.How it works…
The data flow does a simple conversion of the CSV file to a Parquet file. The data flow is linked to a Synapse integration pipeline called Copy_Data_Flow. The Copy_Data_Flow pipeline uses a data flow activity that calls the Copy_CSV_to_Parquet data flow, which does the data transfer. While this task can be done using a simple Copy activity as well (instead of data flow), the key advantage of a data flow is being able to perform transformations while moving the data from the source to the sink.
A common scenario in data engineering pipelines is combining two or more files based on a column, filtering by column, sorting the results, and storing them for querying. We will perform the following actions to achieve this:
In this recipe, we will perform the following:
The detailed steps to carry this out are as follows:
transactiontablet1@transaction_date=="20210915"
into the Filter on text box.transactiontablet1@total_cost
for the filter1’s column dropdown and set Order to Descending.How it works…
A join transformation helped us to combine the transaction_table-t1.csv and transaction_table-t2.csv files based on the tid column. The combined data from both files was filtered using the filter transformation and the transaction_table-t1.csv file’s transaction_date column was filtered for the date 20210915. The sort transformation sorted the filtered rows based on the transaction_table-t1.csv file’s total_cost column. A sink transformation was linked to a dataset in Parquet format, which meant that the sorted and filtered data from the CSV file was seamlessly converted and stored in Parquet format too.
Data flows have plenty of transformation options available. You can explore more examples at https://docs.microsoft.com/en-us/azure/data-factory/data-flow-transformation-overview.
Azure Synapse Analytics provides a user-friendly interface out of the box for monitoring the pipeline and data flow runs. In this recipe, we will track a data flow execution and understand the transformation execution timings.
Perform the following steps to monitor data flows:
How it works…
The Azure Synapse Analytics out-of-the-box monitoring solution records details about all pipeline execution runs and details about the activities inside the pipeline too. In this recipe, we saw that we could quickly identify even a slow transformation inside a data flow activity in a matter of a few clicks. The monitoring data is by default stored for 45 days and can be retained for a longer duration by integrating Synapse Analytics diagnostics data with Azure Log Analytics or an Azure Data Lake Storage account.
Data flows, by default, create partitions behind the scenes to make transformation activities such as join, filter, and sort run faster. Partitions split the large data into multiple smaller pieces so that the backend processes in the data flows can divide and conquer their tasks and finish the execution quickly.
In this recipe, we will take a slow-running data flow and adjust the partitioning to reduce the execution time.
Perform the following steps to optimize data flows:
Let’s make some changes to partitioning in the sort transformation. Click on the Edit transformation button at the top of the page:
Click on the sort1 transformation. Go to the Optimize section. Select Single partition (instead of the existing setting, Use current partitioning). Hit the Publish all button:
Go to DataFlow-Transformation, click on Add trigger, and click Trigger now.
Check the notification section (the bell icon) on the right-hand corner of the screen. The run completion will be indicated via a notification message. Click View pipeline run.
Click on the Dataflow1 activity details. Click on the Stages icon to get the transformation level breakdown of the execution times, as we did previously: - Notice that the total execution time has been reduced to 8 seconds - The sort and sink executions were reduced to 2.8 seconds, compared to the earlier duration of 20 seconds
Before:
Now:
How it works…
Behind the scenes, Synapse Analytics data flows use tasks in Spark clusters to perform data flow transformations. When observing the partition statistics on the sink1 transformation in step 2 of the How to do it… section, we noticed that 1,009 rows were split across 200 partitions. Typically, we would like to have at least a few thousand rows per partition (or 10 MB to 100 MB in size). Having 4 to 6 rows per partition makes any transformation slow and hence, the sort operation was slow as well. Having too many partitions implies that the backend jobs spend a lot of time creating many partitions. This becomes overkill when there are just a few rows per partition and most of the time is spent on creating partitions rather than processing the data inside them.
Switching the partition settings to Single partition via the sink transformation’s Optimize setting creates a single partition for all 1,009 rows, performs the sort activity in a single partition, and returns the results quickly. Had there been, say, a few hundred thousand or a few million rows for the sort activity, having multiple partitions would have been a better bet.
Data flows for each transformation, by default, use the setting called current partitioning, which implies that the data flow will estimate the optimal number of partitions for that transformation based on the size of the data. Setting a single partition as done in this recipe is not recommended for all scenarios – however, it is equally important to track the partition count for transformations and make adjustments if required.
Adding parameters to data flows provides flexibility and agility for data flows while performing their transformations. In this recipe, we will create a data flow that accepts a parameter, filters the data based on the value passed in the parameter, and copies the data to Parquet files.
In this recipe, we will add a parameter to the Copy_CSV_to_Parquet dataflow, and make the Copy_CSV_to_Parquet data flow copy selective rows based on the value passed to the parameter by the Copy_Data_Flow pipeline. The Copy_CSV_to_Parquet data flow currently copies all 300,000 rows present in the transaction_table-t1.csv file and adds the filter based on parameter that will result in lesser number of rows being copied.
How it works…
We added a parameter named sid to the Copy_CSV_to_Parquet data flow. We wrote an expression in the filter transformation in the data flow that compared the sid parameter with the sid column in the data flow. The comparison expression in the filter transformation filtered the rows based on the condition that we defined. We passed the value to the sid data flow parameter from the Copy_Data_Flow pipeline’s data flow activity. The data flow activity exposed the parameters that were present inside the data flow to the pipeline. We passed the value '10' from the Copy_Data_Flow pipeline to the data flow using the pipeline’s data flow activity and all the rows that had a value equal to 10 on the sid column were selected and copied in Parquet format to the sink destination.
A common challenge in extraction, transformation, and load (ETL) projects is when the schema changes at the source and the pipelines that are supposed to read the data from the source, transform it, and ingest it to the destination, start to fail. Schema drift, a feature in data flows, addresses this problem by allowing us to dynamically define the column mapping in transformations. In this recipe, we will make some changes to the schema of a data source, use schema drift to detect the changes, and handle changes without any manual intervention gracefully.
In this recipe, we will be using the Copy_CSV_to_Parquet data flow completed in Copying data using a Synapse data flow. We will perform the following tasks:
Perform the following steps to achieve this:
How it works…
Schema drifting and rule-based mapping offer resilience and flexibility for the pipelines to accommodate almost any kind of change made at the data source. In this recipe, we used the Map drifted columns option and a derived columns transformation in step 7 and step 8 to identify the unplanned or the new columns for the source. We split the new columns using rule-based mapping to flow to a separate sink (a cache sink for testing) and ensured that the originally expected columns flowed to the Parquet destination using fixed rule mapping. Using the preceding recipe, data engineers can ensure that their pipelines transfer data as expected, but they can also track any new columns that are added to the source and can make adjustments to their pipelines if required.