JN.RabbitMQClient

Simple implementation of RabbitMQ consumer and sender.

This is a simple implementation of RabbitMQ consumer and sender. It was initially developed with the aim of learning how to use the official RabbitMQ c# library in simple scenarios. It is in use in several projects in production environments and has proved to be stable. These projects include Rest APIs, windows services, .net Core services (windows and Linux) and others.

Features

Current version

Current version is 2.3.0

Release notes for current version

  • Update RabbitMQ.Client Library to 6.2.1
  • Changed namespace for IRabbitMqConsumerService and IRabbitMqSenderService
  • Changed behavior for StopConsumers(consumerTag) - now stops all consumers with tag starting with 'consumerTag'
  • Added limiter feature

Release notes for previous version (2.2.1)

  • Update RabbitMQ.Client Library to 6.0.0
  • Upgrade to .NET Standard 2.1
  • Solved bug in connect port
  • TLS connection support

To do list

Some new features will be added to future releases because they are needed by some of the projects that use this package.

Planned features

  • Optimization of the connection at the sender.

Install

Download the package from NuGet:

Install-Package JN.RabbitMQClient -version [version number]

The package is available here and source code is available here.

Usage

First, you must create the RabbitMqConsumerService and then define delegates for ReceiveMessage, ShutdownConsumer and ReceiveMessageError. The service will start the required number of consumers when StartConsumers is called. To use a retry queue, the method StartConsumers should be called with a RetryQueueDetails object.

Message processing instructions

The ReceiveMessage delegate receives and processes the message. After the message is processed it returns a message processing instruction.

Instructions

OK - message is considered as successfully processed

RequeueMessageWithDelay - message is removed from the queue, but sent to a retry queue for later processing (typically with a dead letter configuration)

IgnoreMessage - message is removed from the queue and ignored

IgnoreMessageWithRequeue - message is rejected and sent back to the queue

Requeue message with delay

The RequeueMessageWithDelay processing instructions allows a message to be processed later. This is to be used with a secondary queue that will receive the message to be processed. When the message is sent to that queue the timestamp and expiration properties are set. Later, when the message expires on the secondary queue, it is sent back to the main queue. When that happens, the timestamp can be verified and if the elapsed time is longer than allowed, then the message can be ignored (with IgnoreMessage instruction).

For this to work, a configuration like the following could be used.

Example

  • MainQeue - main queue where consumers are connected
  • HoldingQueue - secondary queue to hold retry messages; when a message needs to be processed later it will be sent to this queue.
  • TestExchangeHolding - a dead letter exchange to redirect messages from HoldingQueue to MainQeue when they expire

Configuration

  • HoldingQueue should be configured with "x-dead-letter-exchange" parameter as "TestExchangeHolding".
  • TestExchangeHolding exchange should have a binding to MainQeue

Consumer configuration

To use a retry queue, consumers must be configured. When consumers are started a RetryQueueDetails object must be provided.

Example:

var details = new RetryQueueDetails
{
    RetryQueue="HoldingQueue",
    RetentionPeriodInRetryQueueMilliseconds = 1000,
    RetentionPeriodInRetryQueueMillisecondsMax = 5000
};

This will define the retry queue as "HoldingQueue" and the retention period for each message will be a random value from 1 to 5 seconds. To disabled the random value RetentionPeriodInRetryQueueMillisecondsMax can be set to 0 or to same value as RetentionPeriodInRetryQueueMilliseconds.

About TLS connect support

It is possible to connect to a RabbitMQ using TLS. For this, UseTLS must be true in the configuration object. See the example below.

Client certificates are not supported.

Processing limiter

We can limit the processing of messages. This can be useful if consumers are unable to process all messages or simply need to slow down the processing of messages.

For this we have to provide an implementation of the ILimiter interface to the consumer service. Please see next example.

public class MyApp
{
    private readonly IRabbitMqConsumerService _consumerService;
    private readonly IRabbitMqSenderService _senderService;
    private readonly AppConfig _config;
	
    public MyApp(IRabbitMqConsumerService consumerService, IRabbitMqSenderService senderService, ILimiter limiter)
    {
        _consumerService = consumerService;
        _senderService = senderService;

        _consumerService.ServiceDescription = "Consumer Service";
        _consumerService.ReceiveMessage += ProcessMessage;
        _consumerService.ShutdownConsumer += ProcessShutdown;
        _consumerService.ReceiveMessageError += ProcessError;

        _consumerService.Limiter = limiter; // setup the limiter

        _senderService.ServiceDescription = "Sender Service";

    }

    //... ...
}

It's important to note that messages are always removed from the queue. The ILimiter provided to consumer service will decide if the received message can be processed or not - method IsAllowed(). If the message can't be processed then the processing delegate will not be executed and the processing instruction defined by DeniedProcessInstruction property is returned.

This feature can be useful when combined with an holding queue. In this case, messages that can't be processed are sent to the holding queue for later processing.

A default ILimiter implementation is provided. That is the WindowLimiter class that limits processing to N messages in the defined time window. The next example will return a limiter that allows 3 messages per second. If the message can't be processed then it will be requeded with a delay (sent to the holding queue).

private static WindowLimiter GetLimiter()
{
    const int maxAllowed = 3; // number of items to process in the time window
    const int windowSeconds = 1;
    const Constants.MessageProcessInstruction deniedInstruction = Constants.MessageProcessInstruction.RequeueMessageWithDelay;

    return new WindowLimiter(maxAllowed, windowSeconds, deniedInstruction);
}

Utilites service

A small utilites service class RabbitMqUtilitiesService is provided with methods to create, delete and get the total number of items in a queue.

Example

Example for consumer and sender services:

class Program
{
    static void Main(string[] args)
    {
        // consumer
        var consumerService = new RabbitMqConsumerService(GetBrokerConfigConsumers());

        consumerService.ReceiveMessage += ReceiveMessage;
        consumerService.ShutdownConsumer += ShutdownConsumer;
        consumerService.ReceiveMessageError += ReceiveMessageError;

        consumerService.StartConsumers("my consumer");

        // sender
        var senderService = new RabbitMqSenderService(GetBrokerConfigSender());

        senderService.Send("my message");

        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();

        consumerService.Dispose();
    }

    private static IBrokerConfigSender GetBrokerConfigSender()
    {
        IBrokerConfigSender configSender = new BrokerConfigSender()
        {
            Username = "test",
            Password = "123",
            Host = "localhost",
            VirtualHost = "/",
            RoutingKeyOrQueueName = "MyTestQueue"
        };
        return configSender;
    }

    private static IBrokerConfigConsumers GetBrokerConfigConsumers()
    {
        IBrokerConfigConsumers configConsumers = new BrokerConfigConsumers()
        {
            Username = "test",
            Password = "123",
            Host = "localhost",
            VirtualHost = "/",
            RoutingKeyOrQueueName = "MyTestQueue",
            ShuffleHostList = false,
            Port = 0, // use default port
            TotalInstances = 3,
            UseTLS = true
        };
        return configConsumers;
    }

    private static async Task ReceiveMessageError(string routingKeyOrQueueName, string consumerTag, string exchange, string message, string errorMessage)
    {
        await Console.Out.WriteLineAsync($"Error: '{consumerTag}' | {errorMessage}");
    }

    private static async Task ShutdownConsumer(string consumerTag, ushort errorCode, string shutdownInitiator, string errorMessage)
    {
        await Console.Out.WriteLineAsync($"Shutdown '{consumerTag}' | {errorCode} | {shutdownInitiator} | {errorMessage}");
    }

    private static async Task ReceiveMessage(string routingKeyOrQueueName, string consumerTag, long firstErrorTimestamp, string exchange, string message)
    {
        await Console.Out.WriteLineAsync($"Message received from '{consumerTag}': {message}");
        return Constants.MessageProcessInstruction.OK;
    }
}