How to use Azure Service Bus in your .NET Core Application
Intro
Azure Service Bus is a fully managed message queuing service in Azure that enables reliable and scalable communication between distributed applications. It provides support for both queuing and publish-subscribe messaging patterns, and offers features such as filtering, dead-lettering, and auto-forwarding to help you build robust and flexible applications.
For helpful introduction in creating and managing a Service Bus containing queues and topics in Azure - visit this topic.
Add the required NuGet packages in your application
When using Azure Service Bus in a .NET Core application, you can take advantage of the various Azure Service Bus client libraries available for .NET. These libraries provide a simple and easy-to-use API for sending and receiving messages, creating and managing queues and topics, and configuring other settings such as shared access policies and message properties.
Make sure you have the correct NuGet packages integrated in your project:
Send a single message to a Service Bus Queue
In order to send a single message to a queue in a Service Bus, a proper connection needs to be created which can handle incoming messages. This is a working example of implementing an async method for this:
public async Task SendMessageAsync(object message)
{
var clientOptions = new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets
};
await using var client = new ServiceBusClient(_connectionString, clientOptions);
await using var sender = client.CreateSender(_queueName);
var serviceBusMessage = new ServiceBusMessage(JsonConvert.SerializeObject(message));
await sender.SendMessageAsync(serviceBusMessage);
}
Here's a breakdown of what's happening in the code:
- The SendMessageAsync method takes an object as a parameter, which represents the message that will be sent to the queue.
- A new ServiceBusClientOptions object is created with the TransportType property set to ServiceBusTransportType.AmqpWebSockets. This sets the transport protocol for the client to use when communicating with the Azure Service Bus service. In this case, it uses the AMQP protocol over WebSockets.
- A new ServiceBusClient object is created with the _connectionString field passed in as a parameter. The clientOptions object created in step 2 is also passed to the constructor. The ServiceBusClient class is responsible for managing connections to the Azure Service Bus service.
- A new ServiceBusSender object is created by calling the CreateSender method of the ServiceBusClient object. The _queueName field is passed as a parameter, which specifies the name of the queue to send the message to. The ServiceBusSender class is responsible for sending messages to the specified queue.
- A new ServiceBusMessage object is created by serializing the message object passed as a parameter to JSON format using the JsonConvert.SerializeObject method.
- The SendMessageAsync method of the ServiceBusSender object is called with the serviceBusMessage object created in step 5 as a parameter. This sends the message to the specified queue.
- The ServiceBusClient and ServiceBusSender objects created in steps 3 and 4 are await-disposed using the using keyword, which ensures that the resources associated with the objects are properly released.
Send a messages in batches to a Service Bus Queue
In order to send a multiple messages to a queue in a Service Bus with a single call, an extra step is needed which is implemented in the following example:
public async Task SendMessagesAsync(IEnumerable<object> messages)
{
var clientOptions = new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets
};
await using var client = new ServiceBusClient(_connectionString, clientOptions);
await using var sender = client.CreateSender(_queueName);
using var messageBatch = await sender.CreateMessageBatchAsync();
foreach (var message in messages)
{
var serviceBusMessage = new ServiceBusMessage(JsonConvert.SerializeObject(message));
if (!messageBatch.TryAddMessage(serviceBusMessage))
{
throw new Exception("An error occured while adding a message in the batch");
}
}
await sender.SendMessagesAsync(messageBatch);
}
The main difference between the two methods is that the first method sends a single message to a Service Bus queue, while the second method sends a batch of messages to a Service Bus queue.
The second method uses CreateMessageBatchAsync() to create a batch of messages, then iterates over the IEnumerable<object> and adds each message to the batch using TryAddMessage(). Once all messages have been added, it sends the batch using SendMessagesAsync(). This is more efficient than sending each message individually, as it reduces the number of round trips to the Service Bus.
Receive Messages from a Service Bus Queue - Setup
When speaking of receiving messages from a Service Bus queue in .NET Core, this is an example of an implementation which is actively listening to incoming messages from the queue:
var clientOptions = new ServiceBusClientOptions()
{
TransportType = ServiceBusTransportType.AmqpWebSockets
};
await using var client = new ServiceBusClient(_connectionString, clientOptions);
await using var processor = client.CreateProcessor(_queueName, new ServiceBusProcessorOptions());
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
The following code sets up a ServiceBusProcessor to receive and process messages from an Azure Service Bus queue.
It creates a new ServiceBusClient and ServiceBusProcessor with the given _connectionString and _queueName, and sets up the MessageHandler and ErrorHandler methods to be called when a message is received or when an error occurs.
Finally, it calls StartProcessingAsync() to start receiving messages from the queue and processing them using the MessageHandler method.
Receive Messages from a Service Bus Queue - Handling Messages
When a message is received from a queue, further methods needs to be created in order to handle them, for both cases when the message is received successfully and for errors, as the following implementation example:
private async Task MessageHandler(ProcessMessageEventArgs args)
{
var serializedMessage = args.Message.Body.ToString();
// Do something with the message (eg. Computing, Custom Logging)
await args.CompleteMessageAsync(args.Message);
}
private Task ErrorHandler(ProcessErrorEventArgs args)
{
var exception = args.Exception;
// Log the exception or trigger alerts
return Task.CompletedTask;
}
The MessageHandler function takes the message payload, performs some action on it (e.g. computing, custom logging), and then calls CompleteMessageAsync() to complete the message and remove it from the queue. The ErrorHandler function logs any exceptions that occur during processing.
Conclusions
In conclusion, Azure Service Bus is a powerful messaging service that can help you build robust and scalable distributed applications. Its support for queuing and publish-subscribe messaging patterns, filtering, and other features make it a versatile choice for many different scenarios. With the Azure Service Bus client libraries for .NET, it is easy to integrate Azure Service Bus into your .NET Core application and start using it to send and receive messages.