Simple message queue with RabbitMQ and EasyNetQ on .Net Core

RabbitMQ is one of the most popular open source message brokers and a critical component of distributed applications and platform based on microservice pattern, like online trading, order processing software and booking hubs. Some customers choose RabbitMQ for its feature richness, active community support, and broad range of supported clients and frameworks. However, RabbitMQ message brokers require significant investment in the expertise needed for creating and patching complex clustered deployments. So, this is an example of straightforward message queue with RabbitMQ and EasyNetQ on .Net Core platform.

RabbitMQ and EasyNetQ

A message broker is a computer program module that exchanges messages between the message producers and consumers, thus is able to effectively decouple different software components. RabbitMQ is lightweight and easy to deploy on premises and in the cloud. Moreover, its official tutorials are extremely easy to follow. We already told about synchronic messaging in context of DMS integration before.

RabbitMQ installation

Rabbit message queue supports a lot of runtime platforms like Windows, Debian and Ubuntu. It is easy to integrate with Java, Spring and .Net frameworks using distributed client. In this article we are focused on the .Net Core framework implementation.

First of all we have to download and install latest version of RabbitMQ and appropriate version of Erlang runtime (for more information see the link) or use Docker image. After RabbitMQ server and Erlang distribution installed, be sure that Firewall allows all of necessary port using by Rabbit server and management panel, 5672 and 25672 by default.

To run the RabbitMQ management panel, starts “RabbitMQ Command Prompt in (sbin dir)”. Then execute rabbitmq_management command:

C:\Program Files(x86)\RabbitMQ Server\{rabbitmq-version}\sbin> rabbitmq -plugins enable rabbitmq_management

By default, the management console will be launched on port 15672 and will be available at http://localhost:15672/ or http://host_name:15672/.

The default credentials per user with administrative rights is guest / guest. The guest / guest account is available only from the loopback interface (i.e. localhost). In the administration panel (http: // localhost: 15672), in the users tab, you should create own accounts for a service applications.

Just create a user you need here and set appropriate permissions to him. Different users can be granted access only to specific virtual hosts. Their permissions in each virtual hosts also can be limited.

RabbitMQ cluster configuration

All RabbitMQ brokers start out as running on a single node. These nodes can be joined into clusters, and subsequently turned back into individual brokers again. Clustering capabilities provide a robust, cohesive environment that can span multiple servers. Especially if we suppose your application needs additional delivery guarantees that only highly available queues will satisfy. RabbitMQ’s cohesive clusters create a compelling way to scale RabbitMQ. In addition, clusters provide a mechanism that allows you to create a structured architecture for your publishers and consumers.

A cluster, in RabbitMQ, is the connection of one or more RabbitMQ servers with each other, in which one of the nodes acts as a master server, the rest as slave servers, the master sets the cluster settings, which are duplicated on slaves, to them, in in particular, these include access settings and policies. When the master falls, one of the slaves takes on his role and becomes the master. For more information see the link

First of all, before creating a cluster, we need to synchronize the RabbitMQ cookies of the nodes, the cookie in RabbitMQ is a hash generated during installation, which is used as a node identifier, since the cluster acts as a single node, cookies must be identical on each server.

Copy .cookie file on the master server from

%WINDOWS%\.erlang.cookie

To user location on each cluster machine

C:\Users\%CurrentUser%\.erlang.cookie

The creation of a cluster is carried out by executing the following commands on each slave:

rabbitmqctl stop_app 
rabbitmqctl join_cluster --ram rabbit@master 
rabbitmqctl start_app

This procedure needs to be performed only once, when adding a new node, in the future the node will connect to the cluster automatically (for example, after rebooting the server on which the node was raised).

To check cluster availability execute status command on each slave:

rabbitmqctl cluster_status

Cluster status of node ‘rabbit@slave’ …
[{nodes, [{disc,[‘rabbit@master’,’rabbit@slave’]}]},
{running_nodes, [‘rabbit@master’,’rabbit@slave’]},
{cluster_name, ((rabbit@master))},
…
]

Now, you should see slave node in connections tab on master machine:

Assuming all cluster members are available, a client can connect to any node and perform any operation. Nodes will route operations to the quorum queue leader or queue leader replica transparently to clients.

With all supported messaging protocols a client is only connected to one node at a time.

In case of a node failure, clients should be able to reconnect to a different node, recover their topology and continue operation. For this reason, most client libraries accept a list of endpoints (hostnames or IP addresses) as a connection option. The list of hosts will be used during initial connection as well as connection recovery, if the client supports it. See documentation guides for individual clients to learn more.

Message queue communication with .Net RabbitMQ and EasyNetQ

EasyNetQ is the simple client API for RabbitMQ on .NET platform. It allows to use messages in the same way as .Net types and route it appropriately. This means that messages are defined by .NET classes. Each distinct message type that you want to send is represented by a class. The class should be public with a default constructor and public read/write properties. You would not normally implement any functionality in a message, but treat it as a simple data container or Data Transfer Object (DTO).

EasyNetQ routes messages by their type. When you publish a message, EasyNetQ examines its type and gives it a routing key based on the type name, namespace and assembly. On the consuming side, subscribers subscribe to a type. After subscribing to a type, messages of that type get routed to the subscriber.

By default, EasyNetQ serializes .NET types as JSON using the Newtonsoft.Json library. This has the advantage that messages are human readable, so you can use tools such as the RabbitMQ management application to debug message problems. There are a number of serializes compatible with EasyNetQ like ProtobufNet or ServiceStack serializer.

The RabbitMQ .NET client implements the client side of the AMQP protocol (and RabbitMQ implements the server side). AMQP is intended as the HTTP of messaging. It is designed to be cross platform and language agnostic. It is also designed to flexibly support a wide range of messaging patterns based on the Exchange/Binding/Queue model.

EasyNetQ is a collection of components that provide services on top of the RabbitMQ.Client library. These do things like serialization, error handling, thread marshalling, connection management, etc. They are composed by a mini-IoC container. You can replace any component with your own implementation quite easily. So if you’d like XML serialization instead of the built in JSON, just write an implementation of ISerializer and register it with the container.

These components are fronted by the IAdvancedBus API. This looks like the AMQP specification of Rabbit .Net Client, and you can run most AMQP methods from this API. The only AMQP concept is channels. Layered on top of the advanced API are a set of messaging patterns: Publish/Subscribe, Request/Response, and Send/Receive. This is the ‘opinionated’ part of EasyNetQ.

There is very little flexibility; it means every time you simply may to publish a message and subscribe to it. It’s designed to achieve EasyNetQ’s core goal of making working with RabbitMQ as easy as possible. The pattern sits behind the IBus API. IBus is intended to work for 80% of users, 80% of the time. If the pattern you want to implement is not provided by IBus, then you should use IAdvancedBus. There’s no problem with doing this, and it’s how EasyNetQ is designed to be used.

Simple .Net Core initialization

Let’s create console .Net Core application and set it up like below:

After setup is completed, we able to create WindsorMessageDispatcher implementing Dispatch method from IAutoSubscriberMessageDispatcher interface. This allows us to communicate with Rabbit thread worker:

public class WindsorMessageDispatcher : IAutoSubscriberMessageDispatcher
 {
   …
   //// Configuring to work on 20 test threads
   public WindsorMessageDispatcher(IWindsorContainer container)
   {
       this.container = container;
       this.workerThreads = 
           new BlockingCollection<ConsumerWorkerThread>();
       for (int i = 0; i < 20; i++)
       { 
         this.workerThreads
             .Add(new ConsumerWorkerThread(container, i + 1));
       }
   }

   public Task DispatchAsync<TMessage, TConsumer>(TMessage message)
     where TMessage : class
     where TConsumer : IConsumeAsync<TMessage>
   {
         this.logger.InfoFormat("Dispatching message {0} async", message);
         return Task.Factory.StartNew(() =>
         {
             var worker = this.workerThreads.Take();
             try
             {
                 return worker
                     .ConsumeMessageAsync<TConsumer, TMessage>(message);
             }
             finally
             {
                 this.workerThreads.Add(worker);
             }
         });
     }
…
}

ConsumerWorkerThread implementation looks like:

public class ConsumerWorkerThread 
 {
   …
   public ConsumerWorkerThread(IWindsorContainer container, int workerNumber)
   {
      this.container = container;
      this.logger = container.Resolve<ILogger>();
      this.WorkerNumber = workerNumber;
   }

   public Task ConsumeMessageAsync<TConsumer, TMessage>(TMessage message)
     where TConsumer : IConsumeAsync<TMessage>
     where TMessage : class
   {
       this.logger.InfoFormat("Starting async consume on worker {0}",                 this.WorkerNumber);
       var consumer = this.container.Resolve<TConsumer>();
       try
       {
           return consumer.Consume(message);
       }
       catch (Exception e)
       {
           this.logger.Error("Consumer failed to handle message", e);
       throw;
       }
       finally
       {
           this.container.Release(consumer);
       }
   }
…
}

Next step we are going to create Rabbit AMQP message BUS:

public class MessageBusBuilder
  {
     public static IBus CreateMessageBus()
     {
       var connectionString = ConfigurationManager
           .ConnectionStrings["MessageQueue"];
       if (connectionString == null 
           || connectionString.ConnectionString == string.Empty)
       {
           throw new ConfigurationErrorsException("RabbitMQ [MessageQueue]   connection string is missing or empty");
       } 
         return RabbitHutch.CreateBus(connectionString.ConnectionString);
     }
 }

And connect to test Rabbit cluster created in previous chapters:

<connectionStrings>
    <add name="MessageQueue" connectionString="host=localhost;username=guest;password=guest; timeout=3600" />
</connectionStrings>

The console application created below, bootstrap message bus on startup and initialize base events on message queue Rabbit MQ:

public class BackgroundService
 {
   public BackgroundService()
   {
     this.bus = MessageBusBuilder.CreateMessageBus(); 
   }

   void RabbitMq_MessageReturned(object sender, 
       MessageReturnedEventArgs e)
   {
      this.logger
      .Error(string.Format("Message returned from queue key= {0}, {1}",     
          e.MessageReturnedInfo.RoutingKey,            
          e.MessageReturnedInfo.ReturnReason));
   }

   void RabbitMq_Disconnected(object sender, EventArgs e)
   {
     this.logger.Error("Disconnected from queue");
   }

   private void RabbitMq_Connected(object sender, EventArgs e)
   {
     this.logger.Info("Conected to queue");
     this.TestServiceRestartOnFailure();
   }
   
   private void RestartService()
   {
     this.logger.Warn("Restarting service.");

     try
     {
         this.logger.Warn("Disposing service bus");
         bus.Dispose();
         this.logger.Warn("Disposing log4net");
         log4net.LogManager.GetAllRepositories()
             .ForEach(x => x.Shutdown());
         log4net.LogManager.Shutdown();
     }
     catch
     {
     }
     this.logger.Warn("Exiting with error code 666 
       - service will be restared");
     System.Environment.Exit(666);
 }

 private void RabbitMq_Unblocked(object sender, EventArgs e)
 {
     this.logger.Warn("Queue got unblocked");
 }

 void RabbitMq_Blocked(object sender,     
     RabbitMQ.Client.Events.ConnectionBlockedEventArgs e)
 {
     this.logger.Error("Queue got blocked");
 }

 public void Start()
 {
     this.Logger.Info("Starting up Background Service");
     this.autoSubscriber = 
         new AutoSubscriber(this.bus, "BackgroundTaskService")
     {
         AutoSubscriberMessageDispatcher = 
             new WindsorMessageDispatcher(WindsorContainer.Current)
     };

     this.autoSubscriber.Subscribe(this.GetType().Assembly);
     this.autoSubscriber.SubscribeAsync(this.GetType().Assembly);
     this.Bootstrap();
     PerformanceCounter.Start(this.Logger);
     this.Logger.Info("Finished starting up Background Service");
 }

 /// <summary>
 /// Bootstraps this instance.
 /// </summary>
 /// <returns></returns>
 private Task Bootstrap()
 {
     return Task.Run(() =>
     { 
         var boostrapable =         
             WindsorContainer.Current
             .ResolveAll<IServiceBoostrapNeeded>().OrderBy(x => x.Order);
         foreach (var serviceBoostrapNeeded in boostrapable)
         {
             this.Logger.Info("Bootstrapping " +     
             serviceBoostrapNeeded.GetType().Name);
             serviceBoostrapNeeded.Boostrap();
         }

         this.Logger.Info("Bootstrapping Tarification cache");
         WindsorContainer.Current
             .Resolve<ITarificationCacheResolver>().Boostrap();

         this.Logger.Info("Bootstrapping schedulers");
         WindsorContainer.Current.Resolve<IScheduler>().Start(); 

         this.Logger.Info("Attaching events to queue");
         this.bus.Advanced.Blocked += this.RabbitMq_Blocked;
         this.bus.Advanced.Unblocked += this.RabbitMq_Unblocked;
         this.bus.Advanced.Connected += this.RabbitMq_Connected;
         this.bus.Advanced.Disconnected += this.RabbitMq_Disconnected;
         this.bus.Advanced.MessageReturned +=                 
             this.RabbitMq_MessageReturned; 
         this.Logger.Info("Bootstrapping finished");
     });
 }

 /// <summary>
 /// On service stop
 /// </summary>
 public void Stop()
 {
     this.Logger.Info("Shutting down background service");
     this.autoSubscriber = null;
     try
     {
         this.logger.Warn("Disposing service bus");
         bus.Dispose();
         this.logger.Warn("Disposing log4net");
         log4net.LogManager.GetAllRepositories().ForEach(x => 
             x.Shutdown());
         log4net.LogManager.Shutdown();
     }
     catch
     {
     }
  }
}

Publisher/Subscriber communication

Publisher is a .NET Core application that acts as the sender. Subscriber is a .NET Core application that acts as a receiver. Simple queue publisher describer below. It consists of constructor initializing Rabbit message BUS and implements Put method, which puts message to corresponding message queue:

public class RabbitMqPublisher : IQueuePublisher
 {
    /// <summary>
    /// Rabbit mq bus
    /// </summary>
    private IBus bus;

    /// <summary>
    /// Initializes a new instance of the <see cref="RabbitMqPublisher"/>  
    /// </summary> 
    public RabbitMqPublisher()
    {
       if (ConfigurationManager.AppSettings["DisableMQ"] == "true")
       {
          return;
       }
       this.bus = MessageBusBuilder.CreateMessageBus();
    }

    public Task Put<TObject>(TObject request) where TObject : class 
    {
       if (ConfigurationManager.AppSettings["DisableMQ"] == "true")
       {
          return Task.Run(()=>{});
       }
       return this.bus.PublishAsync(request);
    }

    public Task RequestAsync<TRequest, TResponse>(TRequest request)
       where TRequest : class
       where TResponse : class
    {
       return this.bus.RequestAsync<TRequest, TResponse>(request);
    }
…
}

Common request class is a EasyNetQ wrapper for Rabbit AMPQ and so, should be serializable. Here is a simple mail request:

[Serializable]
[DataContract]
 public class MultiNotificationRequest
 {...}
public class BackgroundSendLogic : ISendLogic
 {
     /// <summary>
     /// Queue publisher
     /// </summary>
     private readonly IQueuePublisher queuePublisher;

     /// <summary>
     /// Initializes a new instance of the 
         <see cref="BackgroundSendLogic"/> class.
     /// </summary>
     /// <param name="queuePublisher">
     /// The queue publisher.
     /// </param>
     public BackgroundSendLogic(IQueuePublisher queuePublisher)
     {
         this.queuePublisher = queuePublisher;
     }

     /// <summary>
     /// Sends message by SMTP Server
     /// </summary>
     /// <param name="request">
     /// Message parameters
     /// </param>
     public void SendMessage(MultiNotificationRequest request)
     {
         this.queuePublisher.Put(request);
     }
 }

Now lets create a Consumer, which implement Rabbit IConsume interface passing corresponding Request class to map a message queue senser to definite consumer in backend logic:

public class MailSendConsumer : IConsume<MultiNotificationRequest>
{
   public void Consume(MultiNotificationRequest message)
   {
       this.Logger
           .Info(string.Format("Consuming message [{0}]", message));
       try
       {
           //// Implement SMTP logic here
           this.mailSendLogic.SendMessage(message);
       }
       catch (SmtpException exc)
       {
           this.Logger.Error("Error sending message", exc);
       }

       this.Logger
        .Info(string.Format("Finished Consuming message [{0}]", message));
   }
…
}

After code be compiled and console application starts, As the result it should be bootstrapping done like above, MultiNotificationRequest queue and exchange should be created on both Rabbit instances:

… and finally

So, EasyNetQ aims to encapsulate all these concerns in a simple to use library that sits on top of the existing AMQP client. It allows to setup and communicate with RabbitMQ on .Net in simple way and use it in a high volume commercial environment.

The performance of EasyNetQ is directly related to the performance of the RabbitMQ broker. This can vary with network and server performance. In tests on a developer machine with a local instance of RabbitMQ, sustained over-night performance of around 2K messages per second was achieved. Memory use for all the EasyNetQ endpoints was stable for the overnight run.

subscribe to newsletter

and receive weekly update from our blog

By submitting your information, you're giving us permission to email you. You may unsubscribe at any time.

Leave a Comment