EIP – Content-Based Router (CBR) for bi-directional data synchronisation using Logic Apps

Summary

Content-Based Message Router (CBR) is an enterprise integration pattern that solves the challenge of routing a message to a particular endpoint by reading the contents of a message. This pattern can be used to solve the bi-directional data synchronisation of two applications without causing a cascading triggering effect.

Content-Based Router

Content-based routing works by using details from the message itself to route to the destination. The Receiver is left to determine if it wants to accept a Message or not.

For example, take the following RabbitMQ message:

curl -s -u USERNAME:USERPASSWORD -H "Accept: application/json" -H "Content-Type:application/json" -X POST -d'{
    "vhost": "/",
    "name": "amq.direct",
    "properties": {
        "delivery_mode": 2,
        "headers": {}
    },
    "routing_key": "QUEUE_NAME",
    "delivery_mode": "1",
    "payload":"ENCODED_JSON_PAYLOAD",
    "headers": {},
    "props": {},
    "payload_encoding": "string"
}' http://localhost:15672/api/exchanges/%2F/amq.direct/publish

The Recipient may decide to accept a message based on any of the properties above or in the payload_encoding itself.

When to use this EIP?

If you are already using a Messaging system that doesn’t support Headers, then use the content of the Payload to route the message to a destination. In other situation, it may not be possible to refactor an existing EIP to route to a target system, Content-Based routing is therefore a valid option.

Systems Context

Take two applications (Dynamics 365 CE and Dynamics 365 Finance & Operation) where Customer records need to be kept in synchronisation. Both applications are run by independent teams that work with Customer data. Without this design pattern the following challenge arises:

  1. A Customer record in Dynamics 365 CE is updated with a new email address.
  2. The update triggers Logic Apps to run. Logic Apps picks up the change any time a record has been updated.
  3. The update is inserted into Dynamics 365 Finance & Operations.
  4. Dynamics 365 Finance & Operations also has triggers enabled. When a record is updated or inserted, it also triggers a Logic App which causes it to run and pushes changes back to Dynamics 365 CE. This causes an infinite loop of triggering and updates.

Message Router Design

A parametric value is assigned to both the Source and Destination tables called ‘Route’. The Route value establishes the application to the message must be routed.

  1. Application A has an email column updated. When the email column is updated, the ‘Route’ column is inserted into the record (APP-B).
  2. Logic Apps are triggered on the update or modification of a record.
  3. Logic Apps uses the ‘Route’ column to determine which application to Route the message to.
  4. On update of the record in Application B, the ‘Route’ column is set to NULL.
  5. Logic Apps is triggered on the update or modification of the record in Application B. Logic Apps uses the Route column to determine that the destination does not exist, therefore the routine is terminated.

Logic Apps Reference Architecture

Two Logic Apps are created for each endpoint (APP-A and APP-B). The Logic Apps are triggered on an update of a record.

The example demonstrates a typical CBR using Microsoft Azure SQL. CBR can also be used to keep platforms such as Microsoft Dynamics 365 Customer Engagement and Microsoft Dynamics 365 Finance and Operations synchronised bi-directionally.

Note that the example demonstrated in this post uses only Logic Apps to maintain data synchronisation in a FIFO sequence. If high volumes of data change are expected then a messaging broker should also be used.

APP-A

App-A is triggered when an update to the email occurs from the App-A endpoint.

During the update of APP-B, the Route is set to NULL since the record should not be expected to be routed to another endpoint.

APP-B

App-B is triggered when an update to the email occurs from the App-B endpoint.

Record Updates

When the email column is updated from App-A, the Logic Apps is triggered causing App-B to subsequently update the email column. This, in turn, triggers App-B, however, the Route for App-B is set to NULL, therefore the app is terminated without reaching any destination.

APP-A Run History

App-B Run History

Logic Apps Code Behind

App-A

{
    "definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "Switch": {
                "cases": {
                    "Case_2": {
                        "actions": {
                            "Update_row_(V2)": {
                                "inputs": {
                                    "body": {
                                        "Email": "@triggerBody()?['Email']",
                                        "Route": "NULL"
                                    },
                                    "host": {
                                        "connection": {
                                            "name": "@parameters('$connections')['sql_1']['connectionId']"
                                        }
                                    },
                                    "method": "patch",
                                    "path": "/v2/datasets/@{encodeURIComponent(encodeURIComponent('default'))},@{encodeURIComponent(encodeURIComponent('default'))}/tables/@{encodeURIComponent(encodeURIComponent('[dbo].[Persons]'))}/items/@{encodeURIComponent(encodeURIComponent(triggerBody()?['PersonID']))}"
                                },
                                "runAfter": {},
                                "type": "ApiConnection"
                            }
                        },
                        "case": "APP-B"
                    },
                    "Case_3": {
                        "actions": {
                            "Terminate": {
                                "inputs": {
                                    "runStatus": "Cancelled"
                                },
                                "runAfter": {},
                                "type": "Terminate"
                            }
                        },
                        "case": "NULL"
                    }
                },
                "default": {
                    "actions": {}
                },
                "expression": "@triggerBody()?['Route']",
                "runAfter": {},
                "type": "Switch"
            }
        },
        "contentVersion": "1.0.0.0",
        "outputs": {},
        "parameters": {
            "$connections": {
                "defaultValue": {},
                "type": "Object"
            }
        },
        "triggers": {
            "When_an_item_is_modified_(V2)": {
                "inputs": {
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['sql']['connectionId']"
                        }
                    },
                    "method": "get",
                    "path": "/datasets/@{encodeURIComponent(encodeURIComponent('default'))},@{encodeURIComponent(encodeURIComponent('default'))}/tables/@{encodeURIComponent(encodeURIComponent('[dbo].[Persons]'))}/onupdateditems"
                },
                "recurrence": {
                    "frequency": "Second",
                    "interval": 10
                },
                "splitOn": "@triggerBody()?['value']",
                "type": "ApiConnection"
            }
        }
    },
    "parameters": {
        "$connections": {
            "value": {
                "sql": {
                    "connectionId": "/subscriptions/cc2f-4c71-9a31/resourceGroups/EXP-x21-DataTransformations/providers/Microsoft.Web/connections/sql-4",
                    "connectionName": "sql-4",
                    "id": "/subscriptions/03f9c629-cc2f-4c71-9a31-cd522f6b0856/providers/Microsoft.Web/locations/uksouth/managedApis/sql"
                },
                "sql_1": {
                    "connectionId": "/subscriptions/cc2f-4c71-9a316b0856/resourceGroups/EXP-x21-DataTransformations/providers/Microsoft.Web/connections/sql-6",
                    "connectionName": "sql-6",
                    "id": "/subscriptions/03f9c629-cc2f-4c71-9a31-cd522f6b0856/providers/Microsoft.Web/locations/uksouth/managedApis/sql"
                }
            }
        }
    }
}

App-B

{
    "definition": {
        "$schema": "https://schema.management.azure.com/providers/Microsoft.Logic/schemas/2016-06-01/workflowdefinition.json#",
        "actions": {
            "Switch": {
                "cases": {
                    "Case": {
                        "actions": {
                            "Update_row_(V2)": {
                                "inputs": {
                                    "body": {
                                        "Email": "@triggerBody()?['Email']",
                                        "Route": "NULL"
                                    },
                                    "host": {
                                        "connection": {
                                            "name": "@parameters('$connections')['sql_2']['connectionId']"
                                        }
                                    },
                                    "method": "patch",
                                    "path": "/v2/datasets/@{encodeURIComponent(encodeURIComponent('default'))},@{encodeURIComponent(encodeURIComponent('default'))}/tables/@{encodeURIComponent(encodeURIComponent('[dbo].[Persons]'))}/items/@{encodeURIComponent(encodeURIComponent(triggerBody()?['PersonID']))}"
                                },
                                "runAfter": {},
                                "type": "ApiConnection"
                            }
                        },
                        "case": "APP-A"
                    },
                    "Case_2": {
                        "actions": {
                            "Terminate": {
                                "inputs": {
                                    "runStatus": "Cancelled"
                                },
                                "runAfter": {},
                                "type": "Terminate"
                            }
                        },
                        "case": "NULL"
                    }
                },
                "default": {
                    "actions": {}
                },
                "expression": "@triggerBody()?['Route']",
                "runAfter": {},
                "type": "Switch"
            }
        },
        "contentVersion": "1.0.0.0",
        "outputs": {},
        "parameters": {
            "$connections": {
                "defaultValue": {},
                "type": "Object"
            }
        },
        "triggers": {
            "When_an_item_is_modified_(V2)": {
                "inputs": {
                    "host": {
                        "connection": {
                            "name": "@parameters('$connections')['sql']['connectionId']"
                        }
                    },
                    "method": "get",
                    "path": "/datasets/@{encodeURIComponent(encodeURIComponent('default'))},@{encodeURIComponent(encodeURIComponent('default'))}/tables/@{encodeURIComponent(encodeURIComponent('[dbo].[Persons]'))}/onupdateditems"
                },
                "recurrence": {
                    "frequency": "Second",
                    "interval": 10
                },
                "splitOn": "@triggerBody()?['value']",
                "type": "ApiConnection"
            }
        }
    },
    "parameters": {
        "$connections": {
            "value": {
                "sql": {
                    "connectionId": "/subscriptions/03f9c629-cc2f-4c71-9a31-cd522f6b0856/resourceGroups/EXP-Intergrations-POC/providers/Microsoft.Web/connections/sql",
                    "connectionName": "sql",
                    "id": "/subscriptions/cc2f-4c71-9a31/providers/Microsoft.Web/locations/uksouth/managedApis/sql"
                },
                "sql_2": {
                    "connectionId": "/subscriptions/03f9c629-cc2f-4c71-9a31-cd522f6b0856/resourceGroups/EXP-Intergrations-POC/providers/Microsoft.Web/connections/sql-2",
                    "connectionName": "sql-2",
                    "id": "/subscriptions/cc2f-4c71-9a31/providers/Microsoft.Web/locations/uksouth/managedApis/sql"
                }
            }
        }
    }
}

Considerations

The benefits of using CBR with the routing destination fixed in the record itself as a field or column allow for the primary system and its subsystems to maintain a de-coupled architecture. The destination or route of the message is encoded within the message itself which means that each record can have a different destination. This also means that in a basic message routing architecture such as the one documented in this post, the routing rules can change at any time without impacting the other components in the integration.

The message routing architecture described in this post does not consider high-availability scenarios. If the source system publishes a message with routing properties and this message was lost in transit, then there is no failover or retrying mechanism. The Logic Apps logs need to be investigated and records must manually be reconciled.

Message Brokers

In the example above, Logic Apps uses a column in the database to route messages to the appropriate destination. Reading and writing from a database may incur a transaction cost. In high-performant systems where speed is required, message brokers should be used with custom properties that define the route.