Getting Started with Service Bus for Windows

From talking to other developers, it appears that the fact that Microsoft shipped Service Bus for Windows last year with an update this year went largely unnoticed.  Hence, I want to document my experience in getting started with the product.

From high level perspective, service bus is a messaging system, promoting the following key values

  • Loose coupling between sender and receiver, whether or not they parts of the same software package or not.
  • High scalability, where sender just needs to leave a message instead of waiting for entire receiving process to complete
  • High availability, assuming service bus is setup with redundancy in mind.

This, this article decries how to get started with Service Bus for Windows.  Service bus is a messaging conduit between two software systems.

First of all, you have to install it locally.  The easiest way to go about it is to use Web Platform Installer.  if you have Visual Studio on your machine, you should already have it.  Just search in the tool for Service Bus for windows.  Make sure not to pick beta.  One word of caution.  If you are also looking into use Windows Workflow, you should install Workflow Manager instead.  Workflow Manager for Windows includes Service Bus in it.  If you just install SB, you will have troubles configuring WF manager down the road.  As a matter of fact, just pick Workflow Manger and install it to be on the safe side.  Once you install it, run Workflow Manager Configuration to setup both workflow and service bus.

Now it is time to write your first Service Bus program.

Start up Visual Studio 2012.  Yes, you will need 2012 to have support for workflow.  Create new project.  Just use Console app.  In my sample I have three projects – sender, receiver and common.  The reason for that setup is that I want to make sure that my independent software pieces can indeed communicate between each other.

Once you create a project, use Nuget Package Manager and search for Service Bus Online.  Add reference to Service Bus for Windows 1.0.  I also add Json.NET that I will use for messages.

image

Now we are ready to create some messages and send/receive them.  I will create one for sample – Person class

namespace ServiceBusAppDemo.Messages
{
    public class Person
    {
        public string FirstName { get; set; }
        public string LastName { get; set; }
    }
}

I added that to Messages project, keeping it isolated from Service Bus projects.  This way both sender and receiver can share the messages.  In my opinion  the easiest way to ensure version tolerance is to convert messages to JSON, hence the use of JSON.NET.

Here is my strategy for further coding.

  • I would like to hide Service Bus implementation from sender and receiver.
  • I would like to encapsulate the logic of queue creation
  • I would like to support strong typing in both message sender and receiver.

First, message class.  It will be generic type and in addition to payload (person class in my example) will also include unique message id, part of BrokeredMessage class.

namespace ServiceBusAppDemo.Common
{
    public class Message<T>
        where T : class, new()
    {
        internal Message() { }
        internal Message(T message)
        {
            MessageBody = message;
            MessageId = string.Empty;
        }
        public string MessageId { internal set; get; }

        public T MessageBody { internal set; get; }

        internal string Serialize()
        {
            return SerializationHelper.SerializeObject(this);
        }

        internal static Message<T> Deserialize(string message, string messageId)
        {
            var returnValue = SerializationHelper.DeserializeObject<Message<T>>(message);
            returnValue.MessageId = messageId;
            return returnValue;
        }

       

    }
}

As you can see, I encapsulate serialization to and from json inside the message class itself.  The serializer class is just as simple.

using System.Reflection;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

namespace ServiceBusAppDemo.Common
{
    internal static class SerializationHelper
    {
        internal static JsonSerializerSettings Settings = new JsonSerializerSettings
        {
            ContractResolver = new DefaultContractResolver
                {
                    DefaultMembersSearchFlags = BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.FlattenHierarchy | BindingFlags.Instance
                }
        };

        internal static string SerializeObject(object target)
        {
            return JsonConvert.SerializeObject(target);
        }

        internal static T DeserializeObject<T>(string target)
            where T : class
        {
            return JsonConvert.DeserializeObject<T>(target, Settings);
        }
    }
}

You probably notices that I use a lot of internal access modifiers, hiding the logic from consumers.

The actual service bus implementation that sender will talk to is just as simple.

using System;

namespace ServiceBusAppDemo.Common
{
    public interface IServiceBus
    {
        void SendMessage<T>(T message)
            where T : class, new();

        bool ReceiveMessage<T>(Software software, Func<Message<T>, bool> processMessageFunction)
            where T: class, new();
    }
}

The implementation of this interface is the only class directly talking to Service Bus for Windows components.

using System;
using System.Configuration;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;


namespace ServiceBusAppDemo.Common
{

    /// <summary>
    /// Service bus implementation
    /// </summary>
    public class ServiceBus : IDisposable, IServiceBus
    {
        private readonly MessagingFactory _messageFactory;
        private readonly NamespaceManager _namespaceManager;
        private readonly ServiceBusConfiguration _serviceBusConfiguration;
        private readonly ILogger _logger;


        /// <summary>
        /// Initializes a new instance of the <see cref="ServiceBus"/> class.
        /// </summary>
        /// <param name="logger">The logger.</param>
        /// <exception cref="System.Configuration.ConfigurationErrorsException">
        /// Could not successfully create namespace manager from connection string
        /// </exception>
        public ServiceBus(ILogger logger)
        {
            try
            {
                _logger = logger;
                _messageFactory = MessagingFactory.CreateFromConnectionString(
                    ConfigurationManager.ConnectionStrings["serviceBus"].ConnectionString);
                _namespaceManager = NamespaceManager.CreateFromConnectionString(
                    ConfigurationManager.ConnectionStrings["serviceBus"].ConnectionString);
                _serviceBusConfiguration = ServiceBusConfiguration.Create();
            }
            catch (Exception exception)
            {
                _logger.LogException(exception);
                throw;
            }

            if (_namespaceManager == null)
            {
                throw new ConfigurationErrorsException(
                    "Could not successfully create namespace manager from connection string");
            }
        }

        /// <summary>
        /// Sends the message.
        /// </summary>
        /// <typeparam name="T">Type of message to send</typeparam>
        /// <param name="message">The message.</param>
        public void SendMessage<T>(T message)
           where T : class, new()
        {
            foreach (var software in _serviceBusConfiguration.Mappings[typeof(T)])
            {
                var queueName = GetQueueName<T>(software);
                CreateQueueIfDoesNotExist(queueName);
                QueueClient queueClient = _messageFactory.CreateQueueClient(queueName);
                try
                {
                    var stringMessage = new Message<T>(message).Serialize();
                    var sendMessage = new BrokeredMessage(stringMessage);
                    queueClient.Send(sendMessage);
                }
                catch (Exception exception)
                {
                    queueClient.Close();
                    _logger.LogException(exception);
                    throw;
                }
            }

        }

        /// <summary>
        /// Receives the message.
        /// </summary>
        /// <typeparam name="T">Type of message to receive</typeparam>
        /// <param name="software">The software that gets the message.</param>
        /// <param name="processMessageFunction">The process message function.</param>
        /// <returns></returns>
        public bool ReceiveMessage<T>(Software software, Func<Message<T>, bool> processMessageFunction)
           where T : class, new()
        {
            var queueName = GetQueueName<T>(software);
            bool received = false;
            CreateQueueIfDoesNotExist(queueName);
            QueueClient queueClient = _messageFactory.CreateQueueClient(queueName, ReceiveMode.PeekLock);

            try
            {
                var message = queueClient.Receive(TimeSpan.FromMilliseconds(100));
                while (message != null && !received)
                {
                    var stringMessage = message.GetBody<string>();
                    Message<T> sampleMessage = Message<T>.Deserialize(stringMessage, message.MessageId);
                    if (sampleMessage != null)
                    {
                        try
                        {
                            var result = processMessageFunction(sampleMessage);
                            if (result)
                            {
                                try
                                {
                                    message.Complete();
                                }
                                catch (Exception exception)
                                {
                                    _logger.LogMessage(
                                        LogLevel.Warning, "Error abandoning message: " + exception);
                                }

                            }
                            else
                            {
                                try
                                {
                                    message.Abandon();
                                }
                                catch (Exception exception)
                                {
                                    _logger.LogMessage(
                                        LogLevel.Warning, "Error abandoning message: " + exception);
                                }
                            }
                            received = true;
                        }
                        catch (Exception exception)
                        {
                            message.Abandon();
                            Console.WriteLine(exception.ToString());
                        }
                    }
                    else
                    {
                        message.Abandon();
                    }
                    message = queueClient.Receive(TimeSpan.FromMilliseconds(100));
                }
            }
            catch (Exception exception)
            {
                _logger.LogException(exception);
                queueClient.Close();
                throw;
            }
            return received;
        }

        /// <summary>
        /// Gets the name of the queue based on software type and message type.
        /// </summary>
        /// <typeparam name="T">Type of message</typeparam>
        /// <param name="software">The software to process 
        /// (send ore receive the message).</param>
        /// <returns></returns>
        private static string GetQueueName<T>(Software software) where T : class
        {
            string queueName = string.Format("{0}-{1}", 
                typeof(T).FullName.ToLower(), 
                software.Name.ToLower());
            return queueName;
        }


        /// <summary>
        /// Creates the queue if does not exist.
        /// </summary>
        /// <param name="queueName">Name of the queue.</param>
        private void CreateQueueIfDoesNotExist(string queueName)
        {
            if (!_namespaceManager.QueueExists(queueName))
            {
                _namespaceManager.CreateQueue(queueName);
            }
        }

        /// <summary>
        /// Performs application-defined tasks associated 
        /// with freeing, releasing, or resetting unmanaged resources.
        /// </summary>
        public void Dispose()
        {
            if (_messageFactory != null)
            {
                _messageFactory.Close();
            }
        }
    }
}

 

You can see most comments above.  Here are a few important points.

I am dynamically determining queue name based on message type and software.  The idea of “software” is very simple – it is the destination for each message.  Configuration can be done via database or some configuration file.  So, when you send a message you check configuration to see what “software” subscribes to each type of message.  This is just my approach, you can pick whatever suits you.  This is essentially all the sender does – sends the message to all destinations.  Receive method just picks a message from queue if one exists, then calls the passed in method to do the processing.  This way both sender and receiver are isolated from the bus implementation, talking to IServiceBus interface.

 

Here is my sample sender class

using System;
using ServiceBusAppDemo.Common;
using ServiceBusAppDemo.Messages;

namespace ServiceBusAppDemo.Sender
{
    class Program
    {
        static void Main(string[] args)
        {
            IServiceBus bus = new ServiceBus(new Logger());
            Console.Write("Press A or B to send one of two test messages, Q to quit");
            var key = Console.ReadKey();
            while (key.Key.ToString().ToUpper() != "Q")
            {
                Console.Clear();
                if (key.Key.ToString().ToUpper() == "A")
                {
                    var person = new Person
                        {
                            FirstName = "Sergey",
                            LastName = "Basrkiy"
                        };
                    bus.SendMessage(person);
                    Console.WriteLine("Person message sent");
                    Console.Write("Press A or B to send one of two test messages, Q to quit");
                    key = Console.ReadKey();
                }
                else if (key.Key.ToString().ToUpper() == "B")
                {
                    var company = new Company
                        {
                            CompanyName = "My",
                            DateAdded = DateTime.Now
                        };
                    bus.SendMessage(company);
                    Console.WriteLine("Company message sent");
                    Console.Write("Press A or B to send one of two test messages, Q to quit");
                    key = Console.ReadKey();
                }
                else
                {
                    Console.WriteLine("Invalid letter");
                    Console.Write("Press A or B to send one of two test messages, Q to quit");
                    key = Console.ReadKey();
                }
            }
            
        }
    }
}

You can use dependency injection of course to create an instance of IServiceBus.  Receiver is just as simple.

using System;
using Newtonsoft.Json;
using ServiceBusAppDemo.Common;
using ServiceBusAppDemo.Messages;

namespace ServiceBusAppDemo.Receiver
{
    class Program
    {
        static void Main(string[] args)
        {
            IServiceBus bus = new ServiceBus(new Logger());
            Console.WriteLine("Press Q to quit, any other key to receive");
            var key = Console.ReadKey();
            while (key.Key.ToString().ToUpper() != "Q")
            {
                Console.Clear();
                bus.ReceiveMessage<Person>(new Software("Package1"), p =>
                    {
                        Console.WriteLine(p != null ? JsonConvert.SerializeObject(p.MessageBody) : "No messages to receive");
                        return true;
                    });

                bus.ReceiveMessage<Company>(new Software("Package2"), p =>
                {
                    Console.WriteLine(p != null ? JsonConvert.SerializeObject(p.MessageBody) : "No messages to receive");
                    return true;
                });

                Console.WriteLine("Press Q to quit, any other key to receive");
                key = Console.ReadKey();
            }
        }
    }
}

You can download entire sample project here.

Thanks.

2 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *