Lately, I’ve been working on a project where we need to decouple the implementation of the service bus to the specific service bus interface implementation in order to meet current and future technology requirements. We’ve been trying to keep the technology stack low and currently have two implementations of Message Queue Services that our systems target. During our analysis we found that a standard has been developed for Message Queue Service Bus Messaging known as Advanced Message Queuing Protocol (AMQP). Several implementations exists for this standard. For our .NET implementation we’ll use AMQP.Net Lite to reach these implementations to make sure that we are implementation independent.
We already have our project running with Active MQ, Active MQ – Artemis, and Rabbit MQ. Now this is what it will take to make it run with Azure Service Bus.
We’ll use C# .NET Core 3.1 for this example.
Lets do it!
Prerequisites
.Net Core 3.1 SDK installed.
Editor (I used Visual Studio Code)
Create Project
Open a command prompt and create a directory by typing:
mkdir azureservicebusclient.amqp.console
Next, create a project by moving to the created directory and typing:
cd azureservicebusclient.amqp.console
dotnet new console
Create appSettings.json File
Create an appSettings.json file in the directory containing the project (remember to replace the values with your own servicebus settings):
{
"ServiceBus": {
"NamespaceUrl": "[The namespace url (i.e. codeityourself.servicebus.windows.net)]",
"PolicyName": "[The policy name]",
"Key": "[The SAS key]",
"QueueName": "[The queue name]",
}
}
You will find your service bus settings in your portal.
Edit azureservicebus.amqp.console.csproj File:
Edit the azureservicebus.amqp.console.csproj project file to add the AMQPNetLite dependencies, and also specify that the appSettings.json file must be copied to the output (buildOptions section) so it becomes available to the application once you’ve built it. You can use the following code:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<PackageId>azureservicebus.amqp.console</PackageId>
<Authors>ChristianAndersen.dk</Authors>
<Product>azureservicebus.amqp.console</Product>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<AssemblyVersion>1.0.0.1</AssemblyVersion>
<FileVersion>1.0.0.1</FileVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AMQPNetLite" Version="2.3.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.2" />
<PackageReference Include="System.Net.Http" Version="4.3.4" />
<PackageReference Include="System.Runtime.Serialization.Xml" Version="4.3.0" />
<PackageReference Include="System.Xml.XmlDocument" Version="4.3.0" />
</ItemGroup>
<ItemGroup>
<None Update="appSettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Restore Packages
Next, we need to restore the packages, we can do this by typing:
dotnet restore
Edit Program.cs File
Edit the program.cs file to this:
using System;
using System.IO;
using System.Net;
using System.Xml;
using Amqp;
using Amqp.Framing;
using Microsoft.Extensions.Configuration;
namespace azureservicebus.amqp.console
{
class Program
{
public static void Main(string[] args)
{
var builder = new ConfigurationBuilder().AddJsonFile("appSettings.json", optional: false, reloadOnChange: true);
var configuration = builder.Build();
// Azure service bus SAS key
var policyName = WebUtility.UrlEncode(configuration["ServiceBus:PolicyName"]);
var key = WebUtility.UrlEncode(configuration["ServiceBus:Key"]);
// Azure service bus namespace
var namespaceUrl = configuration["ServiceBus:NamespaceUrl"];
// Create the AMQP connection string
var connectionString = $"amqps://{policyName}:{key}@{namespaceUrl}/";
// Create the AMQP connection
var connection = new Connection(new Address(connectionString));
// Create the AMQP session
var amqpSession = new Session(connection);
// Give a name to the sender
var senderSubscriptionId = "christiandersen.amqp.sender";
// Give a name to the receiver
var receiverSubscriptionId = "christiandersen.amqp.receiver";
// Azure service bus Queue Name
var queueName = configuration["ServiceBus:QueueName"];
// Name of the topic you will be sending messages (Name of the Queue)
var topic = queueName;
// Create the AMQP sender
var sender = new SenderLink(amqpSession, senderSubscriptionId, topic);
for (var i = 0; i < 10; i++)
{
// Create message
var message = new Message($"Received message {i}");
// Add a meesage id
message.Properties = new Properties() { MessageId = Guid.NewGuid().ToString() };
// Add some message properties
message.ApplicationProperties = new ApplicationProperties();
message.ApplicationProperties["Message.Type.FullName"] = typeof(string).FullName;
// Send message
sender.Send(message);
}
// Create the AMQP consumer
var consumer = new ReceiverLink(amqpSession, receiverSubscriptionId, $"{topic}");
// Start listening
consumer.Start(5, OnMessageCallback);
// Wait for a key to close the program
Console.Read();
}
/// <summary>
/// Method that will be called every time a message is received.
/// </summary>
static void OnMessageCallback(IReceiverLink receiver, Amqp.Message message)
{
try
{
// You can read the custom property
var messageType = message.ApplicationProperties["Message.Type.FullName"];
// Variable to save the body of the message.
string body = string.Empty;
// Get the body
var rawBody = message.Body;
// If the body is byte[] assume it was sent as a BrokeredMessage
// and deserialize it using a XmlDictionaryReader
if (rawBody is byte[])
{
using (var reader = XmlDictionaryReader.CreateBinaryReader(
new MemoryStream(rawBody as byte[]),
null,
XmlDictionaryReaderQuotas.Max))
{
var doc = new XmlDocument();
doc.Load(reader);
body = doc.InnerText;
}
}
else // Asume the body is a string
{
body = rawBody.ToString();
}
// Write the body to the Console.
Console.WriteLine(body);
// Accept the messsage.
receiver.Accept(message);
}
catch (Exception ex)
{
receiver.Reject(message);
Console.WriteLine(ex);
}
}
}
}
Build the program:
Build the program by typing:
Dotnet build
Run the Program
All Done! The only thing left to do is run the program. Simply type the following:
Dotnet run
Comments