Integrating Spring JMS Message Driven POJO with ActiveMQ

JMS Message Driven POJOs using ActiveMQ

Message Driven Beans and problems with EJB

A Message Driven Bean (MDB) is an Enterprise JavaBean (EJB) that allows applications to process messages asynchronously using an Enterprise JavaBean (EJB) container residing inside an EJB Server. This MDB usually acts as a JMS message listener pulling in messages from either queues or topics. The problems with the original EJB standard was that it was far too heavy and too complex for most simple tasks. In addition, its reliance on CORBA for accessing fully distributed systems usually resulted in performance penalties. Complexity of the framework continued to hinder EJBs market penetration and began to stunt its adoption rate. During the same time, other frameworks like Hibernate and Spring began to emerge and slowly gain acceptance in businesses even without large company influence and backing. These “lightweight” technologies allowed you to make use of these features while writing less verbose code and clutter and without needed these services to be deployed in EJB containers.

The newer EJB 3.0 changes makes it much easier to write EJBs using annotations and convention over configuration to reduce code bloat. Further minor enhancements have been made to 3.1 to 3.2 of the EJB specification and now there is even more resemblance to Spring and Hibernate using dependency injection and annotation features.

Enter Message Driven POJOs

Spring allows us to run MDBs in our POJOs (defined in its IoC container) so that they can process JMS messages asynchronously without an EJB container. These Message-Driven POJOs (MDPs), as they are affectionately called, give us the same functionality and offer added flexibility.

Benefits of Message Driven POJOs

  • MDBs must run in an EJB container, MDPs can run anywhere even using simple standalone main() apps.
  • MDPs can utilize all of Spring’s features like dependency injection, AOP support, etc.
  • MDBs require you to implement methods of MessageDrivenBeans which lead to unnecessary work and overhead.
  • When using MDPs, we are able to use Spring adapter classes and reduce and simplify our code even more

Typically, when writing you POJO class you will need to implement the MessageListener interface. Having done this, you will need to implement the onMessage method and typically check for and handle certain instances javax.jms.Message and do something to offload its payload.

ActiveMessageConsumer (First Approach)

package com.avaldes.tutorial;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.TextMessage;

public class ActiveMessageConsumer implements MessageListener {

  public void onMessage(Message message) {
    if (message instanceof TextMessage) {
      TextMessage msg = (TextMessage) message;
      try {
        System.out.println("Received: " + msg.getText());
      } catch (JMSException e) {
        e.printStackTrace();
      }
    } else if (message instanceof MapMessage) {
      MapMessage msg = (MapMessage) message;
        try {
        System.out.println("Received: " +msg.get("message"));
      } catch (JMSException e) {
        e.printStackTrace();
      }
    } else {
      System.out.println("I don't know how to handle this message...");     
    }
  }
}

Typical spring-config.xml settings with this Approach

<bean id="messageListenerContainer"
  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  <property name="connectionFactory" ref="cachingFactory" />
  <property name="destinationName" value="inbound.queue" />
  <property name="messageListener" ref="consumer" />
</bean>

ActiveMessageConsumer (Cleaner Approach)

I think a better alternative would be to use a MessageListenerAdapter. This class will relieve you from the burden of having to write the code to check for instanceof a certain type and makes the code that more cleaner. In addition, you will no longer need to implement the MessageListener interface. When you set up the Adapter you assign a delegate (any POJO class) which will handle the messages without having to do any additional work. Notice how the methods are overloaded to handle the different types of payload.

I will point out that I changed the defaultListenerMethod to processMessage. Without this additional entry, you would have had to ensure that your class uses the default method name of handleMessage.

<bean id="consumer" class="com.avaldes.tutorial.ActiveMessageConsumer" />
  
<bean id="messageAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  <property name="delegate" ref="consumer" />
  <property name="defaultListenerMethod" value="processMessage" />
</bean>
package com.avaldes.tutorial;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.TextMessage;

import org.springframework.stereotype.Service;

@Service
public class ActiveMessageConsumer {
  public String processMessage(String message) {
    System.out.println("{processMessage} Text Received: " + message);
    return "ACK processMessage Text";
  }

  public String processMessage(HashMap<String,String> message) {
    System.out.println("{processMessage} HashMap Received: " + message.get("message"));
    return "ACK processMessage Map";
  }
}

spring-config.xml

For this example, as you can see from the configuration, I am running the ActiveMQ broker on my local machine on the default port of 61616. If you decide to run the broker on a different machine or a different port please make sure to modify the settings for the brokerURL property. Also, you will notice that I have created a queue called “inbound.queue”. For the producer I will be using the JmsTemplate since it will handle the creation and release of resources when sending JMS messages.

A typical flow follows:

ConnectionFactory -> Connection -> Session -> MessageProducer -> send

Notice how we create producer and consumer bean and use their references in the configuration below. The last thing I would pay attention to are the concurrentConsumers and maxConcurrentConsumers properties I have added to see how activeMQ handles some load. With these settings I will get five simultaneous sessions for the listener and depending on the load of the system it will dynamically scale to a maximum of ten, as defined by me in this example.

Upon completion of the sample run, you can see the following on ActiveMQ Consumers screen. As you can see from the image below, looks like we ramped up to the maximum number of consumers (10) during this test.

activemq connections
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xmlns:context="http://www.springframework.org/schema/context"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd">
 
  <context:component-scan base-package="com.avaldes.tutorial" />
 
  <!-- ActiveMQ Connection Factory -->
  <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://127.0.0.1:61616" />
  </bean>
 
  <!-- Caching Connection factory -->
  <bean id="cachingFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="connectionFactory" />
  </bean>
 
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestinationName" value="inbound.queue" />
  </bean>
     
  <bean id="producer" class="com.avaldes.tutorial.ActiveMessageProducer">
    <property name="jmsTemplate" ref="jmsTemplate" />
  </bean>
     
  <bean id="consumer" class="com.avaldes.tutorial.ActiveMessageConsumer" />
  
  <bean id="messageAdapter" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <property name="delegate" ref="consumer" />
    <property name="defaultListenerMethod" value="processMessage" />
  </bean>
  
  <bean id="messageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="cachingFactory" />
    <property name="destinationName" value="inbound.queue" />
    <property name="concurrentConsumers" value="5" />
    <property name="maxConcurrentConsumers" value="10" />
    <property name="messageListener" ref="messageAdapter" />
  </bean>
</beans>

ActiveMessageProducer Producer POJO)

Our producer class is a very simple POJO with only two methods one for setting the JmsTemplate which get done via setter injection in spring-config.xml file and one for sending objects via the JmpTemplate method convertAndSend().

package com.avaldes.tutorial;

import org.springframework.jms.core.JmsTemplate;

public class ActiveMessageProducer {
  private JmsTemplate jmsTemplate;

  public void setJmsTemplate(JmsTemplate jmsTemplate) {
	  this.jmsTemplate = jmsTemplate;
  }

  public void send(final Object Object) {
    jmsTemplate.convertAndSend(Object);
  }
}

Testing out our Message Driven POJO with ActiveMQ

In order for us to test this example we need to ensure that ActiveMQ server is up and running. Go into the bin directory of where you installed your ActiveMQ distribution and look for either activemq.bat (windows) or activemq script (Unix). Run the script with start as parameter to bring up the server listening on the default port of 61616.

Windows Based Systems
If you system name contains an underscore (_) like mine you will need to make some additional changes in the activemq.xml configuration file located in the conf/ directory on your distribution. Just hardcode the machine’s IP address or put in the localhost settings.

ActiveMQ Configuration File (conf/activemq.xml)

<transportConnectors>
  <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
  <transportConnector name="openwire" uri="tcp://127.0.0.1:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="amqp" uri="amqp://127.0.0.1:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="stomp" uri="stomp://127.0.0.1:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="mqtt" uri="mqtt://127.0.0.1:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
  <transportConnector name="ws" uri="ws://127.0.0.1:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

ActiveMQ Server Console

activemq console

Dissecting our TestJMSExample

Looking into our test application, the first thing we do is load the spring application context. We will be using ClassPathXmlApplicationContext which implements the ApplicationContext interface and is one of the thirteen (13) concrete classes used to load the application context.

ClassPathXmlApplicationContext ctx = 
         new ClassPathXmlApplicationContext("spring-config.xml");

I will print out a few informative messages to the console and instantiate an instance of ActiveMessageProducer class called producer. I will then create a message Hashmap the which will load some arbitrary text using the key message and use the producer’s send method() to put the message into ActiveMQ’s queue. I will do this repeatedly and put fifty (50) messages onto the queue.

ActiveMessageProducer producer = (ActiveMessageProducer) ctx.getBean("producer");
for (int i=1; i<=50; i++) {
  HashMap<String, String> message = new HashMap<String, String>();
  String text = String.format("MESSAGE #: %03d", i);
  message.put("message", text);
  producer.send(message);
}

The other part of the test application is simply waiting for the user to hit the ENTER key. The last thing I do is close out any resources to prevent any memory leaks. Notice that I am performing ctx.close() other Spring will still retain a reference to that context.

TestJMSExample.java

package com.avaldes.tutorial;

import java.util.HashMap;
import java.util.Scanner;

import org.springframework.context.support.ClassPathXmlApplicationContext;
 
public class TestJMSExample {

  public static void main(String[] args) {
    Scanner keyScan = null;
    
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext("spring-config.xml");
    
    System.out.println("Starting TestJMSExample");
    System.out.println("Sending 50 messages to ActiveMQ using our Producer...");
    ActiveMessageProducer producer = (ActiveMessageProducer) ctx.getBean("producer");
    for (int i=1; i<=50; i++) {
      HashMap<String, String> message = new HashMap<String, String>();
      String text = String.format("MESSAGE #: %03d", i);
      message.put("message", text);
      producer.send(message);
    }
    
    try {
      System.out.println("Press ENTER to continue...");
      keyScan = new Scanner(System.in);
      while(!keyScan.nextLine().equals(""));
    } catch (Exception ex) {
      ex.printStackTrace();
    } finally {
      keyScan.close();  
    }
      
    ctx.close();
    System.out.println("TestJMSExample finished...");
  }
}

Running the TestJMSExample

activemq queues
log4j:WARN No appenders could be found for logger (org.springframework.core.env.StandardEnvironment).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/repo/.m2/repository/org/apache/activemq/activemq-all/5.10.1/activemq-all-5.10.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/repo/.m2/repository/ch/qos/logback/logback-classic/1.0.13/logback-classic-1.0.13.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Starting TestJMSExample
Sending 50 messages to ActiveMQ using our Producer...
{processMessage} HashMap Received: MESSAGE #: 001
{processMessage} HashMap Received: MESSAGE #: 002
{processMessage} HashMap Received: MESSAGE #: 003
{processMessage} HashMap Received: MESSAGE #: 004
{processMessage} HashMap Received: MESSAGE #: 005
{processMessage} HashMap Received: MESSAGE #: 006
{processMessage} HashMap Received: MESSAGE #: 007
{processMessage} HashMap Received: MESSAGE #: 008
{processMessage} HashMap Received: MESSAGE #: 009
{processMessage} HashMap Received: MESSAGE #: 010
{processMessage} HashMap Received: MESSAGE #: 011
{processMessage} HashMap Received: MESSAGE #: 012
{processMessage} HashMap Received: MESSAGE #: 013
{processMessage} HashMap Received: MESSAGE #: 014
{processMessage} HashMap Received: MESSAGE #: 015
{processMessage} HashMap Received: MESSAGE #: 016
{processMessage} HashMap Received: MESSAGE #: 017
{processMessage} HashMap Received: MESSAGE #: 018
{processMessage} HashMap Received: MESSAGE #: 019
{processMessage} HashMap Received: MESSAGE #: 020
{processMessage} HashMap Received: MESSAGE #: 021
{processMessage} HashMap Received: MESSAGE #: 022
{processMessage} HashMap Received: MESSAGE #: 023
{processMessage} HashMap Received: MESSAGE #: 024
{processMessage} HashMap Received: MESSAGE #: 025
{processMessage} HashMap Received: MESSAGE #: 026
{processMessage} HashMap Received: MESSAGE #: 027
{processMessage} HashMap Received: MESSAGE #: 028
{processMessage} HashMap Received: MESSAGE #: 029
{processMessage} HashMap Received: MESSAGE #: 030
{processMessage} HashMap Received: MESSAGE #: 031
{processMessage} HashMap Received: MESSAGE #: 032
{processMessage} HashMap Received: MESSAGE #: 033
{processMessage} HashMap Received: MESSAGE #: 034
{processMessage} HashMap Received: MESSAGE #: 035
{processMessage} HashMap Received: MESSAGE #: 036
{processMessage} HashMap Received: MESSAGE #: 037
{processMessage} HashMap Received: MESSAGE #: 038
{processMessage} HashMap Received: MESSAGE #: 039
{processMessage} HashMap Received: MESSAGE #: 040
{processMessage} HashMap Received: MESSAGE #: 041
{processMessage} HashMap Received: MESSAGE #: 042
{processMessage} HashMap Received: MESSAGE #: 043
{processMessage} HashMap Received: MESSAGE #: 044
{processMessage} HashMap Received: MESSAGE #: 045
{processMessage} HashMap Received: MESSAGE #: 046
{processMessage} HashMap Received: MESSAGE #: 047
{processMessage} HashMap Received: MESSAGE #: 048
{processMessage} HashMap Received: MESSAGE #: 049
{processMessage} HashMap Received: MESSAGE #: 050
Press ENTER to continue...
{processMessage} Text Received: HEARTBEAT...

Download the Code

That’s It

Enjoy Spring Framework!

spring jms

Related Posts

  • Introduction to Spring Framework, IoC and Injection
    In this tutorial we will cover a basic introduction the Spring framework, Inversion of Control and Dependency Injection. We will discuss the benefits of using it, and why it is so popular in today's environment.
  • Spring Framework Constructor Injection Example
    In this tutorial, we will concentrate on Constructor Injection. As the name suggests, constructor injection is form of dependency injection that is accomplished when we supply the arguments to the constructor so that when the bean is instantiated by the factory it contains all of the necessary information to be property constructed.
  • Spring Framework Constructor Injection with Collections Example
    In this tutorial, we will modify our previous example and add a few collections so we can illustrate how we use constructor injection with differing collection types. Specifically, we will show how to perform constructor injection with List, Map and Set objects.
  • Spring Framework Setter Injection with Examples
    In this tutorial we will discuss Spring Framework Setter Injection which is the main method of dependency injection in Spring. The property element is used to define the setter injection in Spring by using the name of the property element and looking for the corresponding setXXXXXX methods in the specified bean.
  • Spring Bean Scopes Example
    In this tutorial we will begin covering scope of Spring beans. In Spring when you define a bean in the Spring configuration file, you are telling Spring about its scope whether you define it or not. This is a powerful and flexible approach because you can choose the scope of the objects you create via configuration instead of having to hardcode the scope of an object at the Java class level.
  • Spring Bean Life Cycle Example – @PostConstruct, @PreDestroy, InitializingBean, DisposableBean, init-method and destroy-method
    In this tutorial we will cover the Spring Bean Life Cycle and the methods, attributes and annotations used to hook into the management of the bean life cycle in the IoC container.
  • Integrating Java JMX Managed Beans using Spring and Annotations
    This post will show how to create managed beans using the JMX standard using Spring. JMX stands for Java Management Extensions, and was created to implement a standard and uniform way to manage your applications. By using JMX remote can connection to a JVM and monitor running applications on that JVM. In addition to monitoring, remote clients are able to directly run methods and modify runtime parameters of the application that is being managed.
  • Integrating Spring Message Driven POJO with ActiveMQ
    This post will show how to create a message driven POJO that can send and receive JMS message using ActiveMQ as the Message Oriented Middleware (MOM). We will discuss various approaches to creating the MDPs (Message Driven POJOs) in this tutorial and discuss setting up ActiveMQ from Apache.

Please Share Us on Social Media

Facebooktwitterredditpinterestlinkedinmail

Leave a Reply

Your email address will not be published. Required fields are marked *