Try for free Book a demo

Replay Specific Messages in Azure Service Bus

Microsoft Azure

6 Mins Read

Azure Service Bus replay specific message

In the first part of this proof-of-concept – How to replay messages from Azure Service Bus? – Part 1: The dead-letter queue, we explored how to replay messages that have entered the dead-letter queue. In this second part, we explore how to resend a specific message of our choice.

At the end of part one, we mentioned that one thing that is virtually impossible to accomplish using default capabilities inside Azure Service Bus is to reprocess a specific message based on some identifiers; let’s say I want to reprocess the order message X from client Y that is in failure, but that’s not all. For example, when designing decouple asynchronous integrations, we want to resend specific messages that, from the point of integration, were executed successfully but failed from the end application point. Let’s look at this example:

  • Application A sends messages to a Service Bus Topic so that messages can be processed asynchronously and reliably.
    • The topic may have one or multiple subscriptions in order to distinguish messages and/or different subscribers.
  • Then we have a Logic App implementing, for example, the following process:
    • Polling messages from a specific Topic Subscription.
    • The process then implements the required business logic, in some cases, discarding unnecessary messages.
    • In the end, if it is a valid message, it will send a different message (in the format and with content expected by the end system) to a Service Bus Queue.
  • Application B then subscribes these messages from the Queue, once again, in an asynchronous way.

Application sending messages to Service Bus Topic

In these scenarios, it is not simple to easily resubmit messages without asking the original system, in this case, Application A, to resend the messages back to the topic. But the same may apply to simple scenarios like:

  • Application A sends messages to a Service Bus Topic so that messages can be processed asynchronously and reliably.
    • The topic may have one or multiple subscriptions in order to distinguish messages and/or different subscribers.
  • Then we have a Logic App implementing, for example, the following process:
    • Polling messages from a specific Topic Subscription.
    • The process then implements the required business logic and, in the end, synchronously calls an Application B web service or REST API.

Logic Apps polling messages from Service Bus Topic

Of course, in these scenarios, we can easily control the response from Application B and even implement try-catch segments that allow us to easily resubmit messages that fall back to the original topic. Once again, consider implementing additional logic to control how many times we are going to resubmit the messages back to the main topic or queue to avoid infinite loops of the messages. At some point, we need to give up that message or store it in an external system in order to be analyzed.

How can we replay (resubmit/reprocess) a single message of our choice?

There are multiple ways you can accomplish this:

  • From designing our processes to be robust and re-entrant, for example, implementing try-catch mechanisms inside Logic Apps, and if there is a failure, we re-insert the message back to the Service Bus.
    • But this solution will not handle messages that, from the point of integration, were executed successfully.

Logic Apps resending messages to Logic App

  • Using Azure Event Hubs to store the messages that arrive through the Service Bus for possibly later being re-processed.
  • and many others.

All of them will have advantages and disadvantages!

In this second part, we present to you a classic solution that we can almost say is bulletproof, works beautifully, and has been implemented for many years in on-premises integrations. On “old” fashion on-premises solutions, we normally use an SQL or Oracle database to accomplish this task. I still believe that a database is the best option you will find, and that will provide all the extensibility you need (even put a simple portal to consume that information and resubmit the messages). But in this POC, we will “modernize” our solution, and we will be using a Storage Account. This is the overall solution:

  • The Logic App that subscribes to the messages from the Topic subscription will be sending a copy of the original message to a Storage Account.
  • Then, a business user on demand can resubmit a message based on some identifiers, like client name and order identifier.
    • The Logic App responsible for resubmitting the message will go fetch the message from the Storage Account and re-insert it on the original Topic.

resubmitting the message from the Storage Account

Of course, in a real case scenario, we should implement retention policies on the Storage Account.

If you do not have followed our previous proof of concept, we still give you the basic steps to work with Azure Service Bus. First and foremost, you need to create an Azure Service Bus namespace, and to do that you need:

    • On the Portal Search bar, search for Azure Service Bus

    Search for Azure Service Bus in the Azure Portal

    • Click on it, and next click on + Create

    Create Azure Service Bus in the Azure Portal

    • Next, you will have some fields to populate:
      • Subscription
      • Resource Group
      • Namespace name
      • Location
      • Pricing tier: Regarding the pricing tier and for the sake of this document, we are using the Standard pricing tier to access the Topics.
    • Next, click on Review and Create and create the Azure Service Bus Namespace.

    Create the Azure Service Bus Namespace

    After creating the namespace, you can create Queues or Topics. Let’s go with the last one.

    • So click on +Topic

    Create Queues or Topics inside the Azure Service Bus Namespace

    • Give a name to the Topic you want to create and click on Create.

    Naming the created topic

    • Next, click on the Topics on the lateral menu to access the topic we just created.

    Access the created topic

    • Click on the name of the Topic, and this way, you enter the Topic.

    As previously explained, within the topics, you can establish subscriptions. A topic subscription is a named virtual queue that acts as a message destination for subscribers. Each subscription is associated with a specific topic and can have its own set of rules and filters for receiving messages. Clients who have subscribed to this particular topic will subsequently receive messages that pertain to this specific subscription.

    Next, proceed to:

    • Inside the previously created topic, click on the + Subscription button to initiate the creation of a new subscription.

    Initiate the creation of a new subscription

    For the purposes of this Proof of Concept (POC), we intentionally set a brief lifespan for the messages. This aligns with the testing scenario where messages may end up in the dead letter queue either due to message expiration or filter-related issues.

    Here are the steps to follow:

    • Provide a name for the subscription.
    • Set a short time to live for the messages. (15 seconds).
    • Specify a brief lock duration. (20 seconds).
    • Enable dead lettering upon message expiration.
    • Enable the Move messages that cause trigger filter evaluation exceptions to the dead letter subqueue.

    Create of a new subscription

  • Now, if you click on your subscriptions inside the topic on the left menu, you will see that you have your subscription created, so click on it.

Access the created subscription

  • Now, inside the subscription, you insert the filter that you want to be associated with your messages so they can be consumed by those who are filtering for receiving these kinds of messages.
  • As you can see, you just need to click on +Add filter and write the SQL filter you want to be associated with your message; these are like key pairs: a Key and a value.
    • But in our case, we will not do that since the Filter that comes by default (1=1) is a Filter that accepts all the messages that we send, with no regard for the Key and Value, and that is perfect to test this POC.

Add filter inside the subscription

Now, you are ready to send a message to this topic, and to do so, you need to know that, like for HTTP requests, we typically use Postman. In cases like these, we will use Service Bus Explorer:

Extract the downloaded files

  • This will open the Service Bus Explorer.

Open the Service Bus Explorer

What you have is a blank canvas, as you can see in the image, and to change that, you need to create a connection to your Azure Service Bus Topic. To do that, click on File – Connect. A new Panel opens, and in the dropdown menu, choose Enter Connection String. But where is this connection string, and how to get it?

  • Back in Azure Service Bus in the Azure Portal, access your topic, and click on Shared Access Policies on the left panel.

click on Shared Access Policies

  • Next, click on Add and give the SAS Policy a name. By default, the RootManagedSharedAccessKey policy is basically the owner or master user. You will see that the Manage option is selected, which automatically means that Send and Listen will be enabled.

Add and give the SAS Policy a name

  • Now, clicking on the Policy you just created; you will see the Primary Connection String.

Click on the Primary Connection String

  • Copy that Primary Connection String and paste it on the Panel we were working on Service Bus Explorer.

Copy that Primary Connection String

  • Click OK on the Connect to a Service Bus Namespace panel inside Service Bus Explorer, and you should now be connected to your Topic.
  • Next, right-click on the topic and choose Send Messages.

Click on the topic and choose Send Messages

  • That will open a new window where you should be able to define the message payload and properties to be submitted to your topic.
    • Set the format as JSON, and in this case, let’s add some Key and Value pairs:
      • MsgType: SamplesMsg
      • ExtId: sp-1

Add some Key and Value pairs

Let’s send 3 different messages. To do so, change the content of the message (Id) and the value associated with the ExtId Key:

  • Message 1 Message Properties:
    • MsgType (string): SamplesMsg
    • ExtId(string): sp-1
  • Message 1 Message Format: JSON
  • Message 1 Message Text:
{
"id":1,
"data": "This is a message"
}
  • Message 2 Message Properties:
    • MsgType (string): SamplesMsg
    • ExtId(string): sp-2
  • Message 2 Message Format: JSON
  • Message 2 Message Text:
{
"id":2,
"data": "This is a message"
}
  • Message 3 Message Properties:
    • MsgType (string): SamplesMsg
    • ExtId(string): sp-3
  • Message 3 Message Format: JSON
  • Message 3 Message Text:
{
"id":3,
"data": "This is a message"
}

Going back into your Azure Service Bus Subscription, you will see that you now have 3 active messages.

Go back into your Azure Service Bus Subscription

Now, we will need a Logic App to consume those messages. To do that, you need:

  • Access the Azure Portal and search for the Logic Apps on the search bar. And then click on it.

Search for the Logic Apps in the Azure Portal

  • We will create a Logic App consumption, so give it a name relevant to you, your subscription, a resource group, and the location, and click on review and create, and create.
  • We now need to add a When a message is received in a topic Subscription (auto-complete) Azure Service Bus trigger, and then you need to configure the connection with Azure Active Directory and provide the details for connection or by using, for example, Logic App Managed Identity or a Connection String.

create a Logic App consumption

  • After the connection is established with the Namespace of your Azure Service Bus, you can now configure the Trigger.
  • So, we have the Topic name and the subscription name, and as for the subscription type, we want to deal with the messages from your Topic subscription.
  • Then, you can define how often you want to pull the messages from there.

configure the Trigger in Logic App consumption

  • Then, for example, we can initialize a Variable named DecodeContent, with the Type as a string, and as for the value, we dynamically choose the Content associated with the Service Bus Trigger. We do this because the Content on the trigger comes encoded in Base64, and this is enough to decode it into JSON again.

Initialize a Variable named DecodeContent

  • Next, we add a new action, this time a Parse Json and as for the content, we use the variable DecodeContent we created.
  • Next, we need to create a JSON Schema with the same message we sent in the Service Bus Explorer.
    • To do that click on Use sample payload to generate schema, paste the following message, and click Done, the Schema will be automatically generated.
{                                                                                                              
   "id":1,
   "data": "This is a message"
}

You can save your Logic App at this point if you have not saved it earlier.

Now, what we pretend to do is to store the messages that we receive in our topic subscription, and to do so, we need to create a Storage Account. For that, we need to:

  • Access the Azure Portal and search for the storage accounts on the search bar. And then click on it.

Search for the storage accounts in Azure Portal

  • And then click on Create.

Create an Azure Storage account

  • Next, you will need to populate some fields like your Subscription, Resource Group, and Storage account name, and as for Performance, maintain it on Standard. Next, click on Review, wait for validation, and click on Create.

Review and click on Create Azure Storage account

  • Now, after the deployment, click on Go to resource. You have an overview page that you can explore by yourself, but what we need is to give our users full control of the Storage Account; if we do not do this, we will have problems with authorization-denied messages in our logic app.
  • So, to give you full control over the Storage Account, click on Access Control (IAM), and next click on AddAdd role assignment.

Click on Add – Add role assignment

  • Choose privileged administrator roles, and search for Owner. Click on it, and then click on Members.

click on Members and search for owner

  • Here, click on Select members, search for yourself inside your organization, click on Select, and next click on Review and Assign.

Next click on Review and Assign

Now, you have the privilege to work with the storage account freely. So next, what we need to do is to create a Container.

  • The container is where the messages from the Azure Service Bus will be stored by the Logic App. To do so, on the left side menu, click on Containers, and then click on + Container.

Click on Containers and create container

  • Choose the container name, and in the drop-down menu, choose Container and click on Create.

Choose the container name and create

  • We created our own and named it servicebusmessagestoreplay.

Created own servicebusmessagestoreplay

  • Now, if we click and open the container, it will be empty.

Click and open the container

But that is about to change, and to do that, we need to continue with our Logic App.

Until now, our Logic App looked like this:

Continue to the Logic App

We now need to implement the rest of the business logic. For that, we need the:

  • Add another action, this time from Azure Blob Storage, that is connected directly to the Storage Accounts.

Search for Azure Blob Storage

  • Choose the action Create Blob (V2).

Create Azure Blob Storage

  • If you do not have a connection configured, you can use the Authentication typeAzure AD Integrated (or another type of authentication), choose a name for your connection, and click sign in. Place your credentials, and the connection is made.

Choose a name for connection and click sign in

  • Next, you will need to configure the action:
    • Write your storage account name by clicking on Enter Custom Value on the Storage account name or blog endpoint property.

Enter Custom Value on the Storage account name

    • After this, you can assign the Folder path, click on the icon on the right, and choose the container we created previously.

Assign the Folder path and choose the container

Now, we want our message names to look like this:

  • SamplesMsg-1-sp-1
  • SamplesMsg-2-sp-2
  • SamplesMsg-3-sp-3

Here is a snippet part of the code from the outputs in the Trigger. As you can see, the key and value are being passed in the context properties, so we can use that to our advantage and use them to create our message names in the desired format.

"body": {
        "ContentData": "ew0KICAgImlkIjoxLA0KICAgImRhdGEiOiAiVGhpcyBpcyBhIG1lc3NhZ2UiDQp9",
        "ContentType": "",
        "ContentTransferEncoding": "Base64",
        "Properties": {
            "MsgType": "SamplesMsg",
            "ExtId": "sp-1",
            "DeliveryCount": "1",
            "EnqueuedSequenceNumber": "58",
            "EnqueuedTimeUtc": "2023-09-21T10:27:22Z",
            "ExpiresAtUtc": "2023-10-05T10:27:22Z",
            "Label": "Service Bus Explorer",
            "LockedUntilUtc": "2023-09-21T10:28:22Z",
            "LockToken": "aeab8c7e-e223-434a-9ad4-fc6f715bb3b1",
            "MessageId": "084bd2fa-d8c1-4c29-b90e-3eebdf928322",
            "ScheduledEnqueueTimeUtc": "0001-01-01T00:00:00Z",
            "SequenceNumber": "56",
            "Size": "254",
            "State": "Active",
            "TimeToLive": "12096000000000"
        },

But we could also go the the content of the message and extract some meta information.

So what we did was this, which we will explain in detail below:

extract some meta information

So, as for the blob name and to follow this convention SamplesMsg-1-sp-1, we use two expressions:

triggerBody()?['Properties']?[‘MsgType’] 
triggerBody()?['Properties']?['ExtId']

In between these expressions, we used: “-” to separate the values.

As for the content, we dynamically select the Content, of the message:

Dynamically select the Content of the message

The following image explains how dynamic content selection is made:

selec the dynamic content

 

Now you can save your Logic App, and since we have 3 messages on standby in our Azure Service Bus Topic Subscription, they will be consumed and create 3 blobs inside the Container we define in our Logic App.

Create 3 blobs inside the Container

Until now, everything is okay and running smoothly, but now, the missing piece is to implement the part that allows us to replay (resubmit/reprocess) a specific message of our choice into the Azure Service Bus.

For that, we need to create another Logic App (of course, you can implement different types of solutions to accomplish this – we choose a Logic App). It will be another Logic App consumption, so you already know which steps to take here. This logic app will be responsible for replaying the messages into the Azure Service Bus.

  • We started our Logic App with a Request > When a HTTP request is received trigger. Next, we generated the following JSON Schema will be used as input for our Logic App.
    • Do this by clicking on Use sample payload to generate Schema, and next click Done.
{
   "MsgType":"SamplesMsg",
   "Id":1,
   "ExtId":"sp-1"
}
  • Next, we initialized a variable named MessageName, and the purpose of this variable is to create a name in the same format as we created it in the previous Logic App.
    • MsgType-Id-ExtId that should translate into SamplesMsg-1-sp-1

Initiale a variable named MessageName

  • Next, we need to add an Azure Blog Storage > List Blobs (V2) action.

Add an Azure Blog Storage

  • As the name suggests, this action will list all the blobs (messages) inside the container we created.

List all the blobs inside the container

  • Next, inside the For each, we added a Condition.
    • In this condition, we evaluate if the DisplayName of the file inside the Container we created is equal to the name of the file that we will be sending in an HTTP request; if that is true, then the message should be replayed into the Topic Again.

Add a Condition

  • On the True branch – that means it is the file we want; we first need to add a new action from the Azure Blob Storage, this time a Get blob content using path (v2). We dynamically select the Path.

Get blob content using path

  • After that, we need to send the message to the Service Bus and for that, we use the Action Send Message from the Service Bus. Of course, we define the Content of the Send Message action dynamically by using the File Content from the Azure Blob Storage Container – Get Blob Content Using Path (V2). Then we added two more parameters, the Properties, where we send a JSON with the pairs of Key and Values.
{
  "ExtId": "http-inbound-ExtId",
  "Id": "http-inbound-Id",
  "Type": "http-inbound-MsgType"
}

send a JSON with the pairs of Key and Values

Save your Logic App and give it a try.

Of course, in a real case solution, many things may be improved and optimized, but for a proof-of-concept, this is enough. I hope you have enjoyed part 2 of this POC, and we will see you in the next one!

Related reading

This article was published on Nov 1, 2023.

Related Articles