Read Azure IOT Hub Messages Via Service Bus Queue Using Java

In this post, you will learn at the very top level how you can read messages on Azure IOT hub via the service bus queue using the Java program.

So before starting let’s see,

What Is Azure IoT?

Azure IoT solutions are versatile and cover every aspect of IoT design and development, from connecting devices to delivering insights to decision-makers. Customers from different industries, including automotive, manufacturing, and energy, use this IoT platform to drive efficiency and intelligence to their operations.

As there are other cloud services that provide the IoT solution. We are going to learn the Azure IOT hub. There are multiple blogs available that have comparisons between Azure IOT and AWS IoT. You can find a few of them below.

https://www.saviantconsulting.com/blog/azure-iot-vs-aws-iot.aspx

https://www.saviantconsulting.com/blog/azure-iot-vs-aws-iot.aspx

Now getting started with I presume that you have basic knowledge about what Azure and it’s services are. And If you do not have any idea about it you can visit this link.

So the basic flow will be like the architecture above. The request will be made from the client side which will java program in our case then the request will be sent on the Azure service bus and will go to land on the queue and then the queue request will be sent to IoT device and the IoT device will respond accordingly.

So let’s start implementing now, login to Azure Portal and search for an IoT hub and create an IoT hub.

  • Now click on Add button
  • Select your subscription
  • Select resource group
  • Select Location
  • Enter your IoT hub name click next
  • Select Pricing & scale tier
  • Next, add tag i.e option
  • Review and create

So once you create your IoT hub, now you need to add an IoT device in it. For adding IoT devices go to the Explorers -> Select IoT device and add new Device. Once the IoT device is created now we need to configure Service Bus with IoT Hub.

Note: There are other ways to read and write a message on IoT Hub but the choice depends as per the requirement. In my case I want to read and write the messages continuously so here I am using the Service bus queue to read and write messages.

Now let’s create a service bus, search for service bus and click create/add

  • Select subscription
  • Select resource group
  • Add namespace name – This will be a service bus endpoint URL.
  • Select Location
  • Select Price tier
  • Add tags – optional and create

Once the service bus is created now we need to create a queue on the service bus.

Now open the created Service bus and scroll to the Entities section. In that select Queues and Click on add icon to create new.

  • Name the queue
  • Select queue size
  • Add Message time to live
  • Lock duration
  • And check the checkboxes as per your need and hit the create button

Once the queue is created now it’s time to configure the IoT hub with Service bus.

For this go to the created IoT hub and Look for the Events section, in that click on Event Subscription

  1. Name the Event Subscription
  2. Select Event Types
  3. Now in Endpoint details
  • Select endpoint type to Service bus queue
  • Click on the select endpoint

4. Once you click on the select endpoint you’ll get a side menu bar where you need to configure your queue endpoint, you can check in the below image.

  • Select Subscription
  • Select Resource Group
  • Select Service Bus Namespace
  • Select Service Bus Queue

Now we are done with the setup part, we just need a few keys to get access from the Java program.

  • Endpoint URL: To get Endpoint, Go to Service Bus -> Scroll to the setting section -> In that Shared access policies (SAS) and create one. After creating the SAS key you’ll get the primary and secondary connection string. These connection strings will work as an endpoint URL. Just copy one of them for later. Check the below image for reference.

Now we are done with setup part, it’s time to get our hand dirty 😀

Create a java application, create the main method. And use the below code to read messages.

import static java.nio.charset.StandardCharsets.UTF_8;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.Message;
import com.microsoft.azure.servicebus.QueueClient;
import com.microsoft.azure.servicebus.ReceiveMode;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;

public class SendAndReceiveMessage {

 static final String CONNECTIONSTRING = <ENDPOINT_URL>;
 static final Gson GSON = new Gson();
 static final String QUEUE_NAME = <QUEUE_NAME>;

 public void run(String connectionString) throws Exception {

 // Create a QueueClient instance for receiving using the connection string
 // builder
 // We set the receive mode to "PeekLock", meaning the message is delivered
 // under a lock and must be acknowledged ("completed") to be removed from the
 // queue
 QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, QUEUE_NAME),
 ReceiveMode.PEEKLOCK);
 // We are using single thread executor as we are only processing one message at
 // a time
 this.registerReceiver(receiveClient);

 // wait for ENTER or 10 seconds elapsing
 waitForEnter(10);

 // shut down receiver to close the receive loop
 receiveClient.close();
 }

 
 void registerReceiver(QueueClient queueClient) throws Exception {

 // register the RegisterMessageHandler callback with executor service
 queueClient.registerMessageHandler(new IMessageHandler() {
 // callback invoked when the message handler loop has obtained a message
 public CompletableFuture<Void> onMessageAsync(IMessage message) {
 // receives message is passed to callback
 if (message.getBody() != null) {

 byte[] body = message.getBody();
 Map details = GSON.fromJson(new String(body, UTF_8), Map.class);

 System.out.println("Message received: "+details);
 }
 return CompletableFuture.completedFuture(null);
 }

 // callback invoked when the message handler has an exception to report
 public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
 System.out.printf(exceptionPhase + "-" + throwable.getMessage());
 }
 });

 }

 public static void main(String[] args) {

 System.exit(runApp(args, (connectionString) -> {
 SendAndReceiveMessage app = new SendAndReceiveMessage();
 try {
 app.run(connectionString);
 return 0;
 } catch (Exception e) {
 System.out.printf("%s", e.toString());
 return 1;
 }
 }));
 }

 public static int runApp(String[] args, Function<String, Integer> run) {
 try {

 String connectionString = CONNECTIONSTRING;

 // parse connection string from command line
 Options options = new Options();
 options.addOption(new Option("c", true, "Connection string"));
 CommandLineParser clp = new DefaultParser();
 CommandLine cl = clp.parse(options, args);
 if (cl.getOptionValue("c") != null) {
 connectionString = cl.getOptionValue("c");
 }

 // get overrides from the environment
 String env = System.getenv(CONNECTIONSTRING);
 if (env != null) {
 connectionString = env;
 }

 if (connectionString == null) {
 HelpFormatter formatter = new HelpFormatter();
 formatter.printHelp("run jar with", "", options, "", true);
 return 2;
 }
 return run.apply(connectionString);
 } catch (Exception e) {
 System.out.printf("%s", e.toString());
 return 3;
 }
 }

 private void waitForEnter(int seconds) {
 ExecutorService executor = Executors.newCachedThreadPool();
 try {
 executor.invokeAny(Arrays.asList(() -> {
 System.in.read();
 return 0;
 }, () -> {
 Thread.sleep(seconds * 1000);
 return 0;
 }));
 } catch (Exception e) {
 // absorb
 }
 }
}

Now for the testing run the Java program and get to the IoT device which we had created earlier and click on Message to the device, in that add your message and click on send a message. Once you send a message from the IoT device you will receive that message in the Java program.

Hooray! We are done.

References:
Quickstart: Use Azure Service Bus queues with Java to send and receive messages
Quickstart: Use Service Bus topics and subscriptions with Java

Keep Reading

Keep Reading

  • Service
  • Career
  • Let's create something together!

  • We’re looking for the best. Are you in?