Find duplicate records with Azure Data Factory (same source file)
Summary
In this post, I demonstrate how ADF can be used to filter out duplicate rows from a CSV file.
My CSV file contains 50,000 rows of Lead data. Many of these rows are duplicates, where a Lead has enquired about a product through various channels.
My duplicate rule is; If a Lead has the same email address, then it can be considered a duplicate Lead.
Reference Architecture
The final architecture for detecting duplicates
Source File
Here is a snippet of the source file:
Note that I have removed columns that are not relevant to the post.
Source
The source file is a simple CSV stored in Azure Storage (blob) and referenced via a Linked service.
I’ve left the default settings to default but made sure that the schema was detected in the ‘Projection‘ tab.
SurrogateKey
This is an important step that will be used later when using the Join activity.
SurrogateKey (new branch)
Create a new SurrogateKey branch. Again, this is an important step when joining the sources.
Filter
This is an optional step. I added a Filter to remove unwanted rows.
Select
Add a Select activity. This activity will be used during the Join activity.
Aggregate
Here I will group by the column Email. Email is the column that I am considering a duplicate in my source file.
In addition to grouping, I will also create an Aggregate called maxkey.
Join
Finally, join the Aggregate source with the Select source.
Sink
The Sink setting, I have kept as default.
The only difference here is that I have selected to output to a single file:
Result
The final result is the removal of duplicates and the filter criteria:
This is all that is required to remove duplicates.
Script
source(output(
{First name} as string,
Surname as string,
{Primary Phone Number} as string,
Email as string
),
allowSchemaDrift: true,
validateSchema: false) ~> Source
Filter select(skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> Select
SurrogateKey aggregate(groupBy(Email),
maxkey = max(key)) ~> Aggregate
Aggregate, Select join(Aggregate@Email == Select@Email
&& maxkey == key,
joinType:'inner',
broadcast: 'auto')~> Join
Source keyGenerate(output(key as long),
startAt: 1L) ~> SurrogateKey
SurrogateKey filter(notEquals({Primary Phone Number}, 'No Contact Number')) ~> Filter
Join sink(input(
Column_1 as string,
Column_2 as string,
Column_3 as string,
Column_4 as string,
),
allowSchemaDrift: true,
validateSchema: false,
partitionFileNames:['output.csv'],
partitionBy('hash', 1),
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> Sink