Use Azure Data Factory to find duplicate records (two source files)
Summary
I’ve read many sources online that illustrate how to create a flow that detects record duplicates. Some of these duplicate detection patterns can be quite complex, most are old patterns. Azure Data Factory has an Activity called Exists, which can either match a condition to show records that are duplicates, or omit duplicates from a record set.
As of November 2019, detecting duplicate records has become easier.
Putting it together
I’ve uploaded the source files to my public Azure DevOps repo. You can grab both copies here: https://dev.azure.com/eax360/_git/Global%20Catalogue%20Public%20Projects?path=%2Fadf&version=GBmaster
Create the data sets
Create your datasets. I have three CSV datasets:
- Source A – 100 Sales Records (This Month).csv
- Source B – 100 Sales Records (This Month) Duplicates.csv.
- dupefolder – this is the Sink data store.
Create an ‘Exists’ activity
The Data Flow pipeline is as follows:
Creating the Exists activity requires three items:
- Left & Right Streams.
- Exists Type (Exists or Doesn’t Exist)
- Exists condition
For my Exists condition, I have selected the Order ID. This means that the value in Order ID will be checked for duplicates in both Source A & Source B.
Finally Add Sink (optional)
Add the Sink if you need to save the data for later reuse. If you have used the sample files from my repository, you will notice that the output looks as follows:
That’s all that is required to detect duplicates.
If you want to show all other records that are NOT Duplicates, then toggle the switch ‘Exists type‘ to ‘Doesn’t exist‘.
This then provides the following dataset as the output:
ARM JSON Template
{
"$schema": "http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#",
"contentVersion": "1.0.0.0",
"parameters": {
"factoryName": {
"type": "string",
"metadata": "Data Factory name",
"defaultValue": "AS-ADF-dupe001"
},
"AzureBlobStorage1_connectionString": {
"type": "secureString",
"metadata": "Secure string for 'connectionString' of 'AzureBlobStorage1'"
}
},
"variables": {
"factoryId": "[concat('Microsoft.DataFactory/factories/', parameters('factoryName'))]"
},
"resources": [
{
"name": "[concat(parameters('factoryName'), '/AzureBlobStorage1')]",
"type": "Microsoft.DataFactory/factories/linkedServices",
"apiVersion": "2018-06-01",
"properties": {
"annotations": [],
"type": "AzureBlobStorage",
"typeProperties": {
"connectionString": "[parameters('AzureBlobStorage1_connectionString')]"
}
},
"dependsOn": []
},
{
"name": "[concat(parameters('factoryName'), '/FindDuplicates')]",
"type": "Microsoft.DataFactory/factories/dataflows",
"apiVersion": "2018-06-01",
"properties": {
"description": "Find Duplicates in Two sources",
"type": "MappingDataFlow",
"typeProperties": {
"sources": [
{
"dataset": {
"referenceName": "SourceA",
"type": "DatasetReference"
},
"name": "SourceA",
"typeProperties": {}
},
{
"dataset": {
"referenceName": "SourceB",
"type": "DatasetReference"
},
"name": "SourceB",
"typeProperties": {}
}
],
"sinks": [
{
"dataset": {
"referenceName": "dupefolder",
"type": "DatasetReference"
},
"name": "sink1"
}
],
"transformations": [
{
"name": "Exists1"
}
],
"script": "\n\nsource(output(\n\t\tRegion as string,\n\t\tCountry as string,\n\t\t{Item Type} as string,\n\t\t{Sales Channel} as string,\n\t\t{Order Priority} as string,\n\t\t{Order Date} as string,\n\t\t{Order ID} as string,\n\t\t{Ship Date} as string,\n\t\t{Units Sold} as string,\n\t\t{Unit Price} as string,\n\t\t{Unit Cost} as string,\n\t\t{Total Revenue} as string,\n\t\t{Total Cost} as string,\n\t\t{Total Profit} as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> SourceA\nsource(output(\n\t\tRegion as string,\n\t\tCountry as string,\n\t\t{Item Type} as string,\n\t\t{Sales Channel} as string,\n\t\t{Order Priority} as string,\n\t\t{Order Date} as string,\n\t\t{Order ID} as string,\n\t\t{Ship Date} as string,\n\t\t{Units Sold} as string,\n\t\t{Unit Price} as string,\n\t\t{Unit Cost} as string,\n\t\t{Total Revenue} as string,\n\t\t{Total Cost} as string,\n\t\t{Total Profit} as string\n\t),\n\tallowSchemaDrift: true,\n\tvalidateSchema: false) ~> SourceB\nSourceA, SourceB exists(SourceA@{Order ID} == SourceB@{Order ID},\n\tnegate:false,\n\tbroadcast: 'none')~> Exists1\nExists1 sink(allowSchemaDrift: true,\n\tvalidateSchema: false) ~> sink1"
}
},
"dependsOn": [
"[concat(variables('factoryId'), '/datasets/SourceA')]",
"[concat(variables('factoryId'), '/datasets/SourceB')]",
"[concat(variables('factoryId'), '/datasets/dupefolder')]"
]
},
{
"name": "[concat(parameters('factoryName'), '/dupefolder')]",
"type": "Microsoft.DataFactory/factories/datasets",
"apiVersion": "2018-06-01",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"container": "flat-file-system"
},
"columnDelimiter": ",",
"escapeChar": "\\",
"firstRowAsHeader": true,
"quoteChar": "\""
},
"schema": []
},
"dependsOn": [
"[concat(variables('factoryId'), '/linkedServices/AzureBlobStorage1')]"
]
},
{
"name": "[concat(parameters('factoryName'), '/SourceA')]",
"type": "Microsoft.DataFactory/factories/datasets",
"apiVersion": "2018-06-01",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"fileName": "100 Sales Records (This Month).csv",
"container": "flat-file-system"
},
"columnDelimiter": ",",
"escapeChar": "\\",
"firstRowAsHeader": true,
"quoteChar": "\""
},
"schema": [
{
"name": "Region",
"type": "String"
},
{
"name": "Country",
"type": "String"
},
{
"name": "Item Type",
"type": "String"
},
{
"name": "Sales Channel",
"type": "String"
},
{
"name": "Order Priority",
"type": "String"
},
{
"name": "Order Date",
"type": "String"
},
{
"name": "Order ID",
"type": "String"
},
{
"name": "Ship Date",
"type": "String"
},
{
"name": "Units Sold",
"type": "String"
},
{
"name": "Unit Price",
"type": "String"
},
{
"name": "Unit Cost",
"type": "String"
},
{
"name": "Total Revenue",
"type": "String"
},
{
"name": "Total Cost",
"type": "String"
},
{
"name": "Total Profit",
"type": "String"
}
]
},
"dependsOn": [
"[concat(variables('factoryId'), '/linkedServices/AzureBlobStorage1')]"
]
},
{
"name": "[concat(parameters('factoryName'), '/SourceB')]",
"type": "Microsoft.DataFactory/factories/datasets",
"apiVersion": "2018-06-01",
"properties": {
"linkedServiceName": {
"referenceName": "AzureBlobStorage1",
"type": "LinkedServiceReference"
},
"annotations": [],
"type": "DelimitedText",
"typeProperties": {
"location": {
"type": "AzureBlobStorageLocation",
"fileName": "100 Sales Records (This Month) Duplicates.csv",
"container": "flat-file-system"
},
"columnDelimiter": ",",
"escapeChar": "\\",
"firstRowAsHeader": true,
"quoteChar": "\""
},
"schema": [
{
"name": "Region",
"type": "String"
},
{
"name": "Country",
"type": "String"
},
{
"name": "Item Type",
"type": "String"
},
{
"name": "Sales Channel",
"type": "String"
},
{
"name": "Order Priority",
"type": "String"
},
{
"name": "Order Date",
"type": "String"
},
{
"name": "Order ID",
"type": "String"
},
{
"name": "Ship Date",
"type": "String"
},
{
"name": "Units Sold",
"type": "String"
},
{
"name": "Unit Price",
"type": "String"
},
{
"name": "Unit Cost",
"type": "String"
},
{
"name": "Total Revenue",
"type": "String"
},
{
"name": "Total Cost",
"type": "String"
},
{
"name": "Total Profit",
"type": "String"
}
]
},
"dependsOn": [
"[concat(variables('factoryId'), '/linkedServices/AzureBlobStorage1')]"
]
}
]
}