Saturday, August 17, 2013

Using WSO2 Message Broker with RabbitMQ C# .Net Client

As WSO2 Message Broker supports AMQP (Advanced Message Queuing Protocol) v 0.91 standard, this can be used to provide interoperability between different platforms via AMQP clients written on different languages. This blog post is about how to connect with WSO2 MB 2.1.0 using a C# .Net client and perform Queue/Topic based operations on Message Broker. 

In order to connect with MB first we need to find a C# .Net client library which supports AMQP standards. I have used RabbitMQ .Net client library here for this purpose.


1. How to Publish/Subscribe to Queues via .Net client

 

The following tutorial in WSO2 Library provides a complete guide on how to perform Queue operations  in by connecting to WSO2 MB via the .Net client. Therefore i am not going to repeat it here and let's see how to use manipulate Topics with a C# .Net client.


2. How to Publish/Subscribe to Topics via .Net client

 

The main difference between  connecting to a Topic and a Queue is, by default in WSO2 MB they use two different exchanges in order to connect with Queue and Topics. Therefore if you need to connect to a Queue/Topic, first it needs to be bound to the 'Exchange' in the broker by specifying the exchange name. There are two default exchanges used in WSO2 Message Broker.


  • 'amq.direct' : The pre-defined direct exchange, used with Queues
  • 'amq.topic'  :  The pre-defined topic exchange, used with Topics

You can still define and create new exchanges if needed, however they need to be first added into <MB_Home>/Repository/conf/advanced/qpid-virtualhosts.xml file under the <exchanges> section as the example below. However it is usually recommended to use the default direct/topic exchanges.
       
    
        
           direct
           carbon.direct
           true
       
       
            topic
            carbon.topic
       
  



Now let's get back to writing our sample code.

I. The following is a sample client code to publish into  a Topic in the broker. The code itself includes self-explanatory comments which describe its functions.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
 
 
namespace MB_Topic_Publisher
 
{
 
    class TopicPublisher
 
    {
        static void Main(string[] args)
 
        {
            TopicPublisher topicPublisher = new TopicPublisher();
            topicPublisher.PublishMessage("Test Message");
            Console.WriteLine("Message Sent..");
            Console.ReadLine();
        }
 
        public void PublishMessage(string message)
 
        {
 
//Setup the connection with the message broker
            ConnectionFactory factory = new ConnectionFactory();
            IProtocol protocol = Protocols.AMQP_0_8_QPID;
            factory.VirtualHost = "/carbon";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.HostName = "localhost";
            factory.Port = 5672;
            factory.Protocol = protocol;
 
            using (IConnection conn = factory.CreateConnection())
 
            {
                using (IModel ch = conn.CreateModel())
                {
 
// Declare a topic exchange to publish messages, here we have used the default topic exchange of WSO2 MB
                 ch.ExchangeDeclare("amq.topic", "topic");
 
// Publish the message to the exchange, it will send it to the routing key which is our name 'myTopic'.
// The syntax is ch.BasicPublish(exchange_name, topic_name, message_properties,message_body)
                 ch.BasicPublish("amq.topic", "test-topic", null, Encoding.UTF8.GetBytes(message));
 
                }
            }
        }
    }
}


II. Then let's write the sample client code to consume messages from  a Topic in the broker.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client;
 
namespace MB_TopicClient
 
{
    class TopicConsumer
    {
        static void Main(string[] args)
       {
            TopicConsumer topicConsumer = new TopicConsumer();
            topicConsumer.getMessages();
        }
 
        public void getMessages()
        {
 
//Setup the connection with the message broker
            ConnectionFactory factory = new ConnectionFactory(); 
            IProtocol protocol = Protocols.AMQP_0_8_QPID;
            factory.VirtualHost = "/carbon";
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.HostName = "localhost";
            factory.Port = 5672;
            factory.Protocol = protocol;
 
            using (IConnection conn = factory.CreateConnection())
            {
                using (IModel ch = conn.CreateModel())
 
                {
// Declare a topic exchange to be bound to retrieve messages, here we have used the default topic exchange of WSO2 MB
                    ch.ExchangeDeclare("amq.topic", "topic");
             
// Declare a topic name, here we use a non-durable topic. To make it durable use the 2nd parameter as 'true' 
                    ch.QueueDeclare("test-topic", false, false, false, null);
  
// Bind the Topic in to the exchange
                    ch.QueueBind("test-topic", "amq.topic", "test-topic");
                     
// Declare a consumer which listens on the messages published to 'test-topic' topic, we need to declare an exclusive subscriber, in order to get this work.
// The syntax is BasicConsume(queuename, noAck,consumerTag, noLocal, exclusive, arguments, Consumer)                      
                     
                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch);
 
                    ch.BasicConsume("test-topic", false, "1", false, true, null, consumer);
                    while (true)
                    {
                        try
                        {
                            RabbitMQ.Client.Events.BasicDeliverEventArgs e =(RabbitMQ.Client.Events.BasicDeliverEventArgs)consumer.Queue.Dequeue();
                            byte[] body = e.Body;
                            string message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(message);
                            ch.BasicAck(e.DeliveryTag, false);
 
                        }
                        catch (OperationCanceledException e)
                        {
                            Console.WriteLine(e);
                            break;
                        }
                     }
                }
            }
        }
    }
}


3. Download and Running the sample


To run the clients first run the sample Topic Consumer where it will create a new topic subscription for the 'myTopic'. After the when you run the sample Topic Publisher client, the consumer will receive the message published and will display it in the console as "Test Message".

The complete source code along with the RabbitMQ.Client.dll library can be downloaded from here too.


4. A guide to RabbitMQ .Net library API


If you need to learn on how to use the API to customize the sample Publisher and Consumer before using it with WSO2 Message Broker, refer the guides given below. 

  • RabbitMQ .Net client v2.8.4 API Guide : find here 
  • RabbitMQ .Net client v2.8.4 User Guide : find here

4 comments :

  1. I am using WSO2 Message Broker for Windows. I can see the messages and the count in the management console if I use the amq.direct exchange. How can I see the messages and the count when I use amq.topic as exchange?

    ReplyDelete
    Replies
    1. Hi Vijay,

      When we publish messages into a topic, if there is no subscriber currently listening the messages are getting discarded. Therefore we do not store the messages addressed to a topic unless there is a durable subscriber is registered for that topic. Because of this you cannot view the message count from the admin console for a topic.

      Delete
    2. Hi Ishara,

      I have run the Subscriber and the Publisher in parallel. The Subscriber was listening and consuming 100,000 messages from the publisher, but still the Message Broker Admin UI doesn't show the messages and the count per topic. I have sent the screenshots and the .NET code to your email address "idpremadasa@gmail.com". Please go through and let me know.

      Delete
    3. Hi Vijay,

      Thanks for sending me the snapshots and details. As i have said in above comment the present topic UI doesn't show the message count. Message browsing and count is only provided for the Queues in the Admin Console.

      Delete