Find duplicate records with Azure Data Factory (same source file)

Summary

In this post, I demonstrate how ADF can be used to filter out duplicate rows from a CSV file.

My CSV file contains 50,000 rows of Lead data. Many of these rows are duplicates, where a Lead has enquired about a product through various channels.

My duplicate rule is; If a Lead has the same email address, then it can be considered a duplicate Lead.

Reference Architecture

The final architecture for detecting duplicates

Source File

Here is a snippet of the source file:

Note that I have removed columns that are not relevant to the post.

Source

The source file is a simple CSV stored in Azure Storage (blob) and referenced via a Linked service.

I’ve left the default settings to default but made sure that the schema was detected in the ‘Projection‘ tab.

SurrogateKey

This is an important step that will be used later when using the Join activity.

SurrogateKey (new branch)

Create a new SurrogateKey branch. Again, this is an important step when joining the sources.

Filter

This is an optional step. I added a Filter to remove unwanted rows.

Select

Add a Select activity. This activity will be used during the Join activity.

Aggregate

Here I will group by the column Email. Email is the column that I am considering a duplicate in my source file.

In addition to grouping, I will also create an Aggregate called maxkey.

Join

Finally, join the Aggregate source with the Select source.

Sink

The Sink setting, I have kept as default.

The only difference here is that I have selected to output to a single file:

Result

The final result is the removal of duplicates and the filter criteria:

This is all that is required to remove duplicates.

Script

source(output(
		{First name} as string,
		Surname as string,
		{Primary Phone Number} as string,
		Email as string
	),
	allowSchemaDrift: true,
	validateSchema: false) ~> Source
Filter select(skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> Select
SurrogateKey aggregate(groupBy(Email),
	maxkey = max(key)) ~> Aggregate
Aggregate, Select join(Aggregate@Email == Select@Email
	&& maxkey == key,
	joinType:'inner',
	broadcast: 'auto')~> Join
Source keyGenerate(output(key as long),
	startAt: 1L) ~> SurrogateKey
SurrogateKey filter(notEquals({Primary Phone Number}, 'No Contact Number')) ~> Filter
Join sink(input(
		Column_1 as string,
		Column_2 as string,
		Column_3 as string,
		Column_4 as string,
		
	),
	allowSchemaDrift: true,
	validateSchema: false,
	partitionFileNames:['output.csv'],
	partitionBy('hash', 1),
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> Sink