Lately, I’ve been working on a project where we need to decouple the implementation of the service bus to the specific service bus interface implementation in order to meet current and future technology requirements. We’ve been trying to keep the technology stack low and currently have two implementations of Message Queue Services that our systems target. During our analysis we found that a standard has been developed for Message Queue Service Bus Messaging known as Advanced Message Queuing Protocol (AMQP). Several implementations exists for this standard. For our .NET implementation we’ll use AMQP.Net Lite to reach these implementations to make sure that we are implementation independent.

We already have our project running with Active MQ, Active MQ – Artemis, and Rabbit MQ. Now this is what it will take to make it run with Azure Service Bus.

We’ll use C# .NET Core 3.1 for this example.

Lets do it!

Prerequisites

.Net Core 3.1 SDK installed.

Editor (I used Visual Studio Code)

Create Project

Open a command prompt and create a directory by typing:

mkdir azureservicebusclient.amqp.console

Next, create a project by moving to the created directory and typing:

cd azureservicebusclient.amqp.console

dotnet new console

Create appSettings.json File

Create an appSettings.json file in the directory containing the project (remember to replace the values with your own servicebus settings):

{
    "ServiceBus": {
        "NamespaceUrl": "[The namespace url (i.e. codeityourself.servicebus.windows.net)]",
        "PolicyName": "[The policy name]",
        "Key": "[The SAS key]",
        "QueueName": "[The queue name]",
    }
}

You will find your service bus settings in your portal.

Edit azureservicebus.amqp.console.csproj File:

Edit the azureservicebus.amqp.console.csproj project file to add the AMQPNetLite dependencies, and also specify that the appSettings.json file must be copied to the output (buildOptions section) so it becomes available to the application once you’ve built it. You can use the following code:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <PackageId>azureservicebus.amqp.console</PackageId>
    <Authors>ChristianAndersen.dk</Authors>
    <Product>azureservicebus.amqp.console</Product>
    <PackageLicenseExpression>MIT</PackageLicenseExpression>
    <AssemblyVersion>1.0.0.1</AssemblyVersion>
    <FileVersion>1.0.0.1</FileVersion>
  </PropertyGroup>
  <ItemGroup>
    <PackageReference Include="AMQPNetLite" Version="2.3.0" />
    <PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.2" />
    <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.2" />
    <PackageReference Include="System.Net.Http" Version="4.3.4" />
    <PackageReference Include="System.Runtime.Serialization.Xml" Version="4.3.0" />
    <PackageReference Include="System.Xml.XmlDocument" Version="4.3.0" />
  </ItemGroup>
  <ItemGroup>
    <None Update="appSettings.json">
      <CopyToOutputDirectory>Always</CopyToOutputDirectory>
    </None>
  </ItemGroup>
</Project>

Restore Packages

Next, we need to restore the packages, we can do this by typing:

dotnet restore

Edit Program.cs File

Edit the program.cs file to this:

using System;
using System.IO;
using System.Net;
using System.Xml;
using Amqp;
using Amqp.Framing;
using Microsoft.Extensions.Configuration;

namespace azureservicebus.amqp.console
{
    class Program
    {
        public static void Main(string[] args)
        {
            var builder = new ConfigurationBuilder().AddJsonFile("appSettings.json", optional: false, reloadOnChange: true);

            var configuration = builder.Build();
            
            // Azure service bus SAS key
            var policyName = WebUtility.UrlEncode(configuration["ServiceBus:PolicyName"]);
            var key = WebUtility.UrlEncode(configuration["ServiceBus:Key"]);

            // Azure service bus namespace
            var namespaceUrl = configuration["ServiceBus:NamespaceUrl"];

            // Create the AMQP connection string
            var connectionString = $"amqps://{policyName}:{key}@{namespaceUrl}/";

            // Create the AMQP connection
            var connection = new Connection(new Address(connectionString));

            // Create the AMQP session
            var amqpSession = new Session(connection);

            // Give a name to the sender
            var senderSubscriptionId = "christiandersen.amqp.sender";
            
            // Give a name to the receiver
            var receiverSubscriptionId = "christiandersen.amqp.receiver";

            // Azure service bus Queue Name
            var queueName = configuration["ServiceBus:QueueName"];

            // Name of the topic you will be sending messages (Name of the Queue)
            var topic = queueName;

            // Create the AMQP sender
            var sender = new SenderLink(amqpSession, senderSubscriptionId, topic);

            for (var i = 0; i < 10; i++)
            {
                // Create message
                var message = new Message($"Received message {i}");

                // Add a meesage id
                message.Properties = new Properties() { MessageId = Guid.NewGuid().ToString() };

                // Add some message properties
                message.ApplicationProperties = new ApplicationProperties();
                message.ApplicationProperties["Message.Type.FullName"] = typeof(string).FullName;

                // Send message
                sender.Send(message);
            }

            // Create the AMQP consumer
            var consumer = new ReceiverLink(amqpSession, receiverSubscriptionId, $"{topic}");

            // Start listening

            consumer.Start(5, OnMessageCallback);

            // Wait for a key to close the program
            Console.Read();
        }

        /// <summary>
        /// Method that will be called every time a message is received.
        /// </summary>
        static void OnMessageCallback(IReceiverLink receiver, Amqp.Message message)
        {
            try
            {
                // You can read the custom property
                var messageType = message.ApplicationProperties["Message.Type.FullName"];

                // Variable to save the body of the message.
                string body = string.Empty;

                // Get the body
                var rawBody = message.Body;

                // If the body is byte[] assume it was sent as a BrokeredMessage  
                // and deserialize it using a XmlDictionaryReader
                if (rawBody is byte[])
                {
                    using (var reader = XmlDictionaryReader.CreateBinaryReader(
                        new MemoryStream(rawBody as byte[]),
                        null,
                        XmlDictionaryReaderQuotas.Max))
                    {
                        var doc = new XmlDocument();
                        doc.Load(reader);
                        body = doc.InnerText;
                    }
                }
                else // Asume the body is a string
                {
                    body = rawBody.ToString();
                }

                // Write the body to the Console.
                Console.WriteLine(body);

                // Accept the messsage.
                receiver.Accept(message);
            }
            catch (Exception ex)
            {
                receiver.Reject(message);
                Console.WriteLine(ex);
            }
        }
    }
}

Build the program:

Build the program by typing:

Dotnet build

Run the Program

All Done! The only thing left to do is run the program. Simply type the following:

Dotnet run

Get the Code Here

Last modified: December 9, 2020

Author

Comments

Write a Reply or Comment

Your email address will not be published.