Отправка и чтение JMS сообщений

В предыдущей статье был рассмотрен пример отправки и получения JMS сообщений в WEB приложении с контейнером JBoss. В качестве провайдера обслуживания очередей javax.jms.Queue был использован HornetQ, интегрированный в контейнер. Данная статья продолжает рассмотрение вопроса отправки и получения JMS сообщений из Java приложения; в качестве провайдера очередей MQ (Message Queue) будет использоваться Websphere MQ.

Websphere MQ не является предметом описания статьи. Данное программное обеспечение играет вспомогательную роль и ему будет отведено минимальное внимание. Если Вы не знакомы с Websphere MQ, но желаете освоить данное ПО или работу с JMS сообщениями, то можете установить пробную 90-дневную trial версию с официального сайта IBM. Данного времени вполне достаточно для освоения технологии работы с JMS-сообщениями.

Для работы с JMS сообщениями создадим три класса :

JmsProducer: издатель сообщений;
JmsBrowser : просмоторщик сообщений;
JmsConsumer : просмоторщик сообщений.

Отличия двух последних классов JmsBrowser и JmsConsumer связаны с тем, что если первый будет только читать сообщения из очереди, то второй класс будет читать сообщения вместе с их удалением из очереди.

В классы включим методы main, чтобы можно было бы не на словах, а на деле увидеть отправку и получение JMS сообщений. Кроме этого, JMS сообщение будет выведено в консоль, чтобы наглядно представить значимые атрибуты класса javax.jms.Message.

При разработке классов упростим себе жизнь и используем один из основных принципов ООП наследование. Для этого разработаем базовый класс JmsBase, в котором определим настройки канала подключения javax.jms.Connection, а классы издателя и просмоторщиков будут наследовать JmsBase. В результате бизнес-логика будет разделена на две части : родитель будет отвечать за подключение к провайдеру JMS очередей, а наследники будут реализовывать определенные им функции.

Листинг базового модуля JmsBase

Базовый класс JmsBase, как было отмечено выше, отвечает за подключение к провайдеру Websphere MQ. Создадим в JmsBase метод подключения createConnection и абстрактный класс doAction, обязательный для определения в наследниках. В качестве параметров подключения используются наименование и порт компьютера (host, port), наименования менеджера очередей (queueManagerName) и очередь (queueName). Подключение javax.jms.Connection настраивается на менеджера очередей 'host1/qm1' и очередь 'queue1'.

В конструкторе класса создается подключение javax.jms.Connection и вызывается метод doAction. Таким образом, в наследниках потребуется только соответствующим образом реализовать предначертанные им функции.

На скриншоте представлен фрагмент интерфейса приложения MQExplorer с открытым окном просмотра сообщений очереди. MQExplorer включен в дистрибутив Websphere MQ.

import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.JMSException;

import com.ibm.msg.client.wmq.WMQConstants;

import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.jms.JmsConnectionFactory;

public abstract class JmsBase
{
    private String   host              = "localhost"         ;
    private int      port              = 1414                ;
    private String   channel           = "SYSTEM.DEF.SVRCONN";
    private String   queueManagerName  = "host1/qm1"         ;
    private String   queueName         = "queue1"            ;  
    private boolean  clientTransport   = false               ;

    protected Session      session     = null;
    protected Queue        destination = null;

    protected Connection   connection  = null;
                         // статус выполнения приложения
    protected static int status = 1; 

    protected abstract void doAction();
    //---------------------------------------------------------------
    private void createConnection()
    {
        JmsFactoryFactory    ff;
        JmsConnectionFactory cf;
        try {
            ff = JmsFactoryFactory.getInstance(
                                   WMQConstants.WMQ_PROVIDER);
            cf = ff.createConnectionFactory();

            // Set the properties
            cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, host   );
            cf.setIntProperty   (WMQConstants.WMQ_PORT     , port   );
            cf.setStringProperty(WMQConstants.WMQ_CHANNEL  , channel);
            if (clientTransport) {
                cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE,
                                  WMQConstants.WMQ_CM_CLIENT);
            } else {
                cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE,
                                  WMQConstants.WMQ_CM_BINDINGS);
            }
            cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER,
                                 queueManagerName);
            connection  = cf.createConnection();
        } catch (JMSException jmsex) {
            recordFailure(jmsex);
        }
    }
    //---------------------------------------------------------------
    public JmsBase()
    {
        try {
            createConnection();
            session = connection.createSession(false,
                                          Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(queueName);
            // Старт подключения
            connection.start();
            // Выполнение действий
            doAction();
        }
        catch (JMSException jmsex) {
            recordFailure(jmsex);
        } finally {
            try {
                if (session != null)
                    session.close();
                if (connection != null)
                    connection.close();
            } catch (JMSException jmsex) {
                System.err.println("" + jmsex.getMessage());
                recordFailure(jmsex);
            }
        }
    }
    //---------------------------------------------------------------
    protected void recordFailure(Exception ex)
    {
        // ...
    }
    //---------------------------------------------------------------
    protected void processJMSException(JMSException jmse)
    {
        // ...
    }
}

Код процедур обработки исключений на странице не приводится. Желающие могут скачать рабочие примеры.

Листинг издателя сообщений JmsProducer

Класс издателя JMS сообщений наследует все свойства родителя JmsBase и определяет процедуру doAction(), в которой выполняется отправка сообщений провайдеру Websphere MQ.

В конструкторе класса выполняется подключение к провайдеру MQ, после чего в процедуре doAction() формируется список сообщений messages и создается издатель сообщений producer типа MessageProducer. Для создания MessageProducer используется метод createProducer объекта сессии javax.jms.Session с параметром очереди destination (javax.jms.Queue). Если издатель создан, т.е. имеется канал связи и исключений не было, то в цикле вызывается метод send (TextMessage) для отправки двух сообщений.

import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.MessageProducer;

public class JmsProducer extends JmsBase
{
    private MessageProducer producer = null;

    public JmsProducer()
    {
        super();
        try {
            if (producer != null)
                producer.close();
        } catch (JMSException jmse) {
            recordFailure(jmse);
        }
    }
    @Override
    protected void doAction() 
    {
        // Создание текстовых сообщений
        String[] messages = new String[2];
        messages[0] = "Good day";
        messages[1] = "Hello, world!";
        String TEMPL = "Cообщение #%d отправлено";
        try {
            producer = session.createProducer(destination);
            System.out.println("Отправка JMS сообщений\n");
            TextMessage message = null;
            for (int i = 0; i < messages.length; i++) {
                message = session.createTextMessage(messages[i]);
                producer.send(message);
                System.out.println(String.format(TEMPL, (i + 1)));
            }
        } catch (JMSException e) {
            status = -2;
            recordFailure(e);
        }
    }
    public static void main(String[] args)
    {
        new JmsProducer();
        System.exit(status);
    }
}

Класс издателя сообщений JmsProducer включает метод main, позволяющий протестировать отправку JMS сообщений. В результате работы класса в консоль будет выведена следующая информация об отправке сообщений.


Отправка JMS сообщений

Cообщение #1 отправлено
Cообщение #2 отправлено
 

На следующем скриншоте представлено окно просмотра сообщений в приложении MQExplorer : в дереве объектов слева выделены Очереди менеджера 'host1/qm1', в панели данных справа выделена очередь 'queue1'. Их перекрывает окно Просмотра сообщений в очереди, в котором отображены отправленные сообщения.

Листинг просмоторщика сообщений JmsBrowser

Класс просмоторщика JMS сообщений JmsBrowser наследует все свойства родителя JmsBase и определяет процедуру doAction(). В конструкторе класса выполняется подключение к провайдеру MQ, после чего в процедуре doAction() создается объект просмотра сообщений browser типа QueueBrowser и выполняется чтение сообщений messages. Сообщения выводятся в консоль и выполнение процедуры завершается. Все тривиально просто и прозрачно.

import javax.jms.Message;
import javax.jms.JMSException;
import javax.jms.QueueBrowser;

import java.util.Enumeration;

public class JmsBrowser extends JmsBase
{
    private QueueBrowser browser = null;

    public JmsBrowser()
    {
        super();
        try {
            if (browser != null)
                browser.close();
        } catch (JMSException jmsex) {
            recordFailure(jmsex);
        }
    }
    @Override
    protected void doAction() 
    {
        try {
            browser = session.createBrowser(destination);
            // Чтение сообщений
            Enumeration<?> messages = browser.getEnumeration();

            int count = 0;
            Message message;
            // Просмотр сообщений
            System.out.println("Просмотр JMS сообщений : ");
            while (messages.hasMoreElements()) {
                message = (Message) messages.nextElement();
                System.out.println("\nСообщение " + (++count) + " :");
                System.out.println(message);
            }
            System.out.println("\nПросмотр JMS сообщений завершен\n");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) 
    {
        new JmsBrowser ();
        System.exit(status);
    }
}

Класс включает метод main, позволяющий протестировать чтение JMS сообщений. В результате выполнения класса в консоль будут выведены два сообщения, отправленные из JmsProducer. Вы можете оценить количество атрибутов JMS сообщения. В последней строке представлен текст сообщения.

Просмотр JMS сообщений : 

Сообщение 1 :
  JMSMessage class: jms_text
  JMSType:          null
  JMSDeliveryMode:  1
  JMSDeliveryDelay: 0
  JMSDeliveryTime:  0
  JMSExpiration:    0
  JMSPriority:      0
  JMSMessageID:     ID:414d5120686f7374312f716d31202020cf428c5922ed5a02
  JMSTimestamp:     1502364406670
  JMSCorrelationID: null
  JMSDestination:   null
  JMSReplyTo:       null
  JMSRedelivered:   false
    JMSXAppID: \IBM\MQ\bin64\MQExplorer.exe
    JMSXDeliveryCount: 1
    JMSXUserID: Serg
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 546
    JMS_IBM_Format: MQSTR   
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 11
    JMS_IBM_PutDate: 20170810
    JMS_IBM_PutTime: 11264667
Good day

Сообщение 2 :
  JMSMessage class: jms_text
  JMSType:          null
  JMSDeliveryMode:  1
  JMSDeliveryDelay: 0
  JMSDeliveryTime:  0
  JMSExpiration:    0
  JMSPriority:      0
  JMSMessageID:     ID:414d5120686f7374312f716d31202020cf428c5922ed5b02
  JMSTimestamp:     1502364423160
  JMSCorrelationID: null
  JMSDestination:   null
  JMSReplyTo:       null
  JMSRedelivered:   false
    JMSXAppID: \IBM\MQ\bin64\MQExplorer.exe
    JMSXDeliveryCount: 1
    JMSXUserID: Serg
    JMS_IBM_Character_Set: UTF-8
    JMS_IBM_Encoding: 546
    JMS_IBM_Format: MQSTR   
    JMS_IBM_MsgType: 8
    JMS_IBM_PutApplType: 11
    JMS_IBM_PutDate: 20170810
    JMS_IBM_PutTime: 11270316
Hello, world!

Просмотр JMS сообщений завершен
 

Листинг просмоторщика сообщений JmsConsumer

Класс просмоторщика JMS сообщений JmsConsumer также, как и предыдущие классы, наследует все свойства родителя JmsBase и определяет процедуру doAction(). В конструкторе класса также выполняется подключение к провайдеру MQ, после чего в процедуре doAction() создается объект чтения сообщений consumer типа MessageConsumer. Для чтения сообщений используется метод receive, который читает сообщение из очереди с удалением. В качестве параметра методу передается время ожидания в милисекундах. Сообщения выводятся в консоль и выполнение процедуры завершается по истечении установленного в timeout времени.

import javax.jms.Message;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;

public class JmsConsumer extends JmsBase
{
    private  final  int      timeout  = 30000;
    private  MessageConsumer consumer = null;

    @Override
    protected void doAction()
    {
        Message message;
        try {
            consumer = session.createConsumer(destination);
            do {
                message = consumer.receive(timeout);
                if (message != null)
                    System.out.println("\nСообщение :\n" + message);
            } while (message != null);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        System.out.println("\nПросмотр JMS сообщений завершен\n");
    }
    public JmsConsumer()
    {
        super();
        try {
            if (consumer != null)
                consumer.close();
        } catch (JMSException jmsex) {
            recordFailure(jmsex);
        }
    }
    public static void main(String[] args)
    {
        new JmsConsumer();
        System.exit(status);
    }
}

В результате выполнения JmsConsumer в консоли будут также представлены JMS сообщения, как и в предыдущем JmsBrowser. Если Вы имеете доступ к провайдеру JMS очередей, то можете убедиться, что сообщения из очереди удалены.

Скачать примеры

Исходные коды рассмотренных классов в виде проекта Eclipse можно скачать здесь (7.42 Мб). Дистрибутив рабочий и включает весь необходимый набор библиотек.

Пример WEB-приложения отправки и чтения JMS-сообщений из контейнера JBoss представлен здесь.

  Рейтинг@Mail.ru