JN.RabbitMQClient

Simple implementation of RabbitMQ consumer and sender.

This is a simple implementation of RabbitMQ consumer and sender. It was initially developed with the aim of learning how to use the official RabbitMQ c# library in simple scenarios. It is in use in several projects in production environments and has proved to be stable. These projects include Rest APIs, windows services, .net Core services (windows and Linux) and others.

Features

  • Sender implementation
  • Multiple consumer instances supported
  • Multiple processing options for received messages
  • Random expiration for messages sent to an holding queue (depending on the processing option)
  • TLS connection support

Current version

Current version is 2.2.1

Release notes for current version

  • Update RabbitMQ.Client Library to 6.0.0
  • Upgrade to .NET Standard 2.1
  • Solved bug in connect port
  • TLS connection support

To do list

Some new features will be added to future releases because they are needed by some of the projects that use this package.

Planned features

  • Optimization of the connection at the sender.
  • Processing limiter - process only a limited number of messages per minute

Install

Download the package from NuGet:

Install-Package JN.RabbitMQClient -version [version number]

The package is available here and source code is available here.

Usage

First, you must create the RabbitMqConsumerService and then define delegates for ReceiveMessage, ShutdownConsumer and ReceiveMessageError. The service will start the required number of consumers when StartConsumers is called. To use a retry queue, the method StartConsumers should be called with a RetryQueueDetails object.

Message processing instructions

The ReceiveMessage delegate receives and processes the message. After the message is processed it returns a message processing instruction.

Instructions

OK - message is considered as successfully processed

RequeueMessageWithDelay - message is removed from the queue, but sent to a retry queue for later processing (typically with a dead letter configuration)

IgnoreMessage - message is removed from the queue and ignored

IgnoreMessageWithRequeue - message is rejected and sent back to the queue

Requeue message with delay

The RequeueMessageWithDelay processing instructions allows a message to be processed later. This is to be used with a secondary queue that will receive the message to be processed. When the message is sent to that queue the timestamp and expiration properties are set. Later, when the message expires on the secondary queue, it is sent back to the main queue. When that happens, the timestamp can be verified and if the elapsed time is longer than allowed, then the message can be ignored (with IgnoreMessage instruction).

For this to work, a configuration like the following could be used.

Example

  • MainQeue - main queue where consumers are connected
  • HoldingQueue - secondary queue to hold retry messages; when a message needs to be processed later it will be sent to this queue.
  • TestExchangeHolding - a dead letter exchange to redirect messages from HoldingQueue to MainQeue when they expire

Configuration

  • HoldingQueue should be configured with "x-dead-letter-exchange" parameter as "TestExchangeHolding".
  • TestExchangeHolding exchange should have a binding to MainQeue

Consumer configuration

To use a retry queue, consumers must be configured. When consumers are started a RetryQueueDetails object must be provided.

Example:

var details = new RetryQueueDetails
{
    RetryQueue="HoldingQueue",
    RetentionPeriodInRetryQueueMilliseconds = 1000,
    RetentionPeriodInRetryQueueMillisecondsMax = 5000
};

This will define the retry queue as "HoldingQueue" and the retention period for each message will be a random value from 1 to 5 seconds. To disabled the random value RetentionPeriodInRetryQueueMillisecondsMax can be set to 0 or to same value as RetentionPeriodInRetryQueueMilliseconds.

About TLS connect support

It is possible to connect to a RabbitMQ using TLS. For this, UseTLS must be true in the configuration object. See the example below.

Client certificates are not supported.

Utilites service

A small utilites service class RabbitMqUtilitiesService is provided with methods to create, delete and get the total number of items in a queue.

Example

Example for consumer and sender services:

class Program
{
    static void Main(string[] args)
    {
        // consumer
        var consumerService = new RabbitMqConsumerService(GetBrokerConfigConsumers());

        consumerService.ReceiveMessage += ReceiveMessage;
        consumerService.ShutdownConsumer += ShutdownConsumer;
        consumerService.ReceiveMessageError += ReceiveMessageError;

        consumerService.StartConsumers("my consumer");

        // sender
        var senderService = new RabbitMqSenderService(GetBrokerConfigSender());

        senderService.Send("my message");

        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();

        consumerService.Dispose();
    }

    private static IBrokerConfigSender GetBrokerConfigSender()
    {
        IBrokerConfigSender configSender = new BrokerConfigSender()
        {
            Username = "test",
            Password = "123",
            Host = "localhost",
            VirtualHost = "/",
            RoutingKeyOrQueueName = "MyTestQueue"
        };
        return configSender;
    }

    private static IBrokerConfigConsumers GetBrokerConfigConsumers()
    {
        IBrokerConfigConsumers configConsumers = new BrokerConfigConsumers()
        {
            Username = "test",
            Password = "123",
            Host = "localhost",
            VirtualHost = "/",
            RoutingKeyOrQueueName = "MyTestQueue",
            ShuffleHostList = false,
            Port = 0, // use default port
            TotalInstances = 3,
            UseTLS = true
        };
        return configConsumers;
    }

    private static async Task ReceiveMessageError(string routingKeyOrQueueName, string consumerTag, string exchange, string message, string errorMessage)
    {
        await Console.Out.WriteLineAsync($"Error: '{consumerTag}' | {errorMessage}");
    }

    private static async Task ShutdownConsumer(string consumerTag, ushort errorCode, string shutdownInitiator, string errorMessage)
    {
        await Console.Out.WriteLineAsync($"Shutdown '{consumerTag}' | {errorCode} | {shutdownInitiator} | {errorMessage}");
    }

    private static async Task ReceiveMessage(string routingKeyOrQueueName, string consumerTag, long firstErrorTimestamp, string exchange, string message)
    {
        await Console.Out.WriteLineAsync($"Message received from '{consumerTag}': {message}");
        return Constants.MessageProcessInstruction.OK;
    }
}