Develop with Azure Service Bus (3): Topic and Subscription

In this article, I will introduce you how to use “Azure Service Bus Client for Java” to work with topics and subscriptions in Service Bus.

The architecture is showed as below:

In contrast to queues, in which each message is processed by a single consumer, topics and subscriptions provide a one-to-many form of communication, in a publish/subscribe pattern. You can create multiple subscriptions for one topic. When sending messages to the topic, every subscription will be able to process the message.

Next I will demonstrate the features with two pieces of code. To try it by yourself, you need the prerequisites below:

  1. The connection string of Service Bus. And its shared access policy should have “Send” and “Listen” permission enabled.
  2. Java Runtime Environment
  3. Java IDE (Optional)

1. Configuration

Create Maven project, and add the following dependency: 
<dependency>
 <groupId>com.microsoft.azure</groupId>
 <artifactId>azure-servicebus</artifactId>
 <version>1.0.0</version>
</dependency>

2. Send messages with Topic

2.1 AzureServiceBusTopic.java

public class AzureServiceBusTopic {
 private String connectionString;
 private String topicName ;
 private TopicClient topicClient;
 
 public AzureServiceBusTopic(String connectionString, String topicName) throws Exception{
  this.connectionString = connectionString;
  this.topicName = topicName;
  this.topicClient = getTopicClient();
 }
 
 public TopicClient getTopicClient() throws Exception {
  if(topicClient == null)
   return new TopicClient(new ConnectionStringBuilder(connectionString,topicName));
  else
   return topicClient;
 }
 
 //add custom properties to message 
 //You can set rules later to deliver the message to multiple subscribers 
 //A property named "priority" is added here. It will be used later to determine which subscription the message will be sent to
 public void sendMessages(String clientName, int numMessages) throws Exception {
  for(int i = 0; i < numMessages; i ++) {
   String messageBody = "Message is " + i;
   Message message = new Message(messageBody.getBytes("utf-8"));
   Map<String, String> properties = new HashMap<String, String>();
   properties.put("priority", (i<20) ? "High" : "Low");
   properties.put("number", String.format("%d", i));
   message.setProperties(properties);
   topicClient.send(message);
   System.out.println("Client -> " + clientName + " ; Send -> " + messageBody);
  }
 }

 public void close() throws Exception {
  topicClient.close();
 }
 
 public static void main(String[] args) throws Exception {
 
  String connectionString = "Your connection string";
  String topic = "testTopic";//topic name
 
  AzureServiceBusTopic azureServiceBusTopic = new AzureServiceBusTopic(connectionString, topic);
  azureServiceBusTopic.sendMessages("TopicTest",100);
 
  azureServiceBusTopic.close();
 }
}

3.Receive messages from the subscription

3.1 AzureServiceBusSubscription.java

public class AzureServiceBusSubscription {
 private String connectionString ;
 private String subscriptionName ;
 private SubscriptionClient subscriptionClient;
 
 //When creating SubscriptionClient object, SubscriptionName parameter is actually "topic/subscriptions/subscriptionName"
 public AzureServiceBusSubscription(String connectionString, String topic, String subscriptionName) throws Exception{
  this.connectionString = connectionString;
  this.subscriptionName = topic + "/subscriptions/" + subscriptionName;
  this.subscriptionClient = getSubscriptionClient();
 }
 
 public SubscriptionClient getSubscriptionClient() throws Exception{
  if(subscriptionClient == null)
   return new SubscriptionClient(new ConnectionStringBuilder(connectionString, subscriptionName), ReceiveMode.PEEKLOCK);
  else
   return subscriptionClient;
 }
 
 //Receive the message. MessageHandler is used here to handle the message asynchronously
 //MessageHandler implements IMessageHandler interface that defines onMessageAync and notifyException methods. 
 public void receiveMessages(String clientName) throws Exception {
  subscriptionClient.registerMessageHandler(new IMessageHandler() {
   public CompletableFuture<Void> onMessageAsync(IMessage message) {
    System.out.println("Client -> " + clientName + " ; Receive -> " + new String(message.getBody()));
    return subscriptionClient.completeAsync(message.getLockToken()).thenRunAsync(()->{
      // Do something.
    });
   }

   public void notifyException(Throwable exception, ExceptionPhase phase) {
    System.out.println(phase + " encountered exception:" + exception.getMessage());
   }
  }, new MessageHandlerOptions(1, false, Duration.ofSeconds(30)));
 }
 
 //get all the rules
 public RuleDescription[] getAllRules() throws Exception{
  RuleDescription[] rds = subscriptionClient.getRules().toArray(new RuleDescription[0]);
  for (RuleDescription rd : rds) {
   System.out.println(rd.getName());
   System.out.println(rd.getAction());
   System.out.println(rd.getFilter());
   System.out.println();
  }
  return rds;
 }
 
 //add rule
 //The rule to add:SqlFilter sqlFilter = new SqlFilter(filter);
 //SqlRuleAction is the action if the rule is met/triggered 
 public void addRule(String filter, String description) throws Exception{
  SqlFilter sqlFilter = new SqlFilter(filter);
  SqlRuleAction sqlRuleAction = new SqlRuleAction("set FilterTag = 'true'");
  RuleDescription ruleDescription = new RuleDescription(description, sqlFilter);
  ruleDescription.setAction(sqlRuleAction);
  subscriptionClient.addRule(ruleDescription);
 }
 
 //remove rule with removeRule method
 public void deleteAllRules() throws Exception{
  RuleDescription[] rules = getAllRules();
  for (RuleDescription ruleDescription : rules) {
   subscriptionClient.removeRule(ruleDescription.getName());
  }
 }
 
 //delete default rules
 //Default rules make all messages to be sent to the subscription 
 //After default rules are deleted, only the messages that match the rules will be sent to the subscription
 public void deleteDefaultRule() throws Exception{
  subscriptionClient.removeRule(subscriptionClient.DEFAULT_RULE_NAME);
 }
 
 public void close() throws Exception{
  subscriptionClient.close();
 }
 
 public static void main(String[] args) throws Exception {
 String connectionString = "your connection string";
 
 //one subscription with high priority and the other with low priority 
 String topic = "testTopic";
 String high_subscription = "high";
 String low_subscription = "low";

 AzureServiceBusSubscription high_sub = new AzureServiceBusSubscription(connectionString, topic, high_subscription);
 AzureServiceBusSubscription low_sub = new AzureServiceBusSubscription(connectionString, topic, low_subscription);

 //delete default rules
 high_sub.deleteDefaultRule();
 low_sub.deleteDefaultRule();

 //rule fitler
 String high_filter = "priority = 'High'";
 String low_filter = "priority = 'Low'";

 //add rule
 high_sub.addRule(high_filter, "High");
 low_sub.addRule(low_filter, "Low");
 
 //Receive message
 //In this sample, there is only one consumer for each priority. But in real world, the high priority messages should have more consumers, thus they are processed faster
 high_sub.receiveMessages("High");
 low_sub.receiveMessages("Low");

 //wait for process to finish
 Thread.sleep( 30 *1000 );

 high_sub.close();
 low_sub.close();
 }
}

4. Summary

This article introduces how to use topics and subscriptions in Azure Service Bus, along with code writing with the latest SDK. If you have any questions, welcome to contact us via aka.ms/devchat

Leave a Reply

Your email address will not be published. Required fields are marked *