четверг, 10 июля 2014 г.

Передача сообщений между WebSocket и JMS (Java EE)

Появилась в одном проекте необходимость реализовать трансфер сообщений из JMS в веб-приложение конечному пользователю. Так как проект написан на Java EE 7, наиболее простым способом я счёт использование Websocket.

Передача сообщений из JMS в WebSocket

Первый способ передачи сообщений между JMS-листнером и эндпоинтом, который появился у меня в голове: добавляем к классу-эндпоинту аннотации @Named и @ApplicationScoped и внедряем его в JMS-листнер (Лишний код в примерах опущен).

@MessageDriven(mappedName = "websocket.out")
public class SimpleMessageListener implements MessageListener {
    @Inject
    private SimpleEndpoint endpoint;

    @Override
    public void onMessage(Message message) {
        try {
            endpoint.sendMessage(message);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}


@Named
@ApplicationScoped
@ServerEndpoint(value = "/wse")
public class SimpleEndpoint {

    public void sendMessage(Message message) throws IOException, JMSException {
        for (Session session : sessions) {
            if (session.isOpen()) {
                session.getBasicRemote().sendText(((TextMessage) message).getText());
            } else {
                sessions.remove(session);
            }
        }
    }
}


Тесты показали, что всё отлично работает.

Есть второй вариант, описанный Bruno Borges, который заключается в использовании CDI-событий. В данном случае JMS-листнер будет публиковать события, а эндпоинт будет их ловить. Изменения кода минимальны.

Изменения в JMS-листнере:
    @Inject
    @JmsToWsMessage
    private Event<Message> event;

    @Override
    public void onMessage(Message message) {
        try {
            event.fire(message);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
Изменения в эндпоинте:
    public void sendMessage(@Observes @JmsToWsMessage Message message) throws IOException, JMSException {
        for (Session session : sessions) {
            if (session.isOpen()) {
                session.getBasicRemote().sendText(((TextMessage) message).getText());
            } else {
                sessions.remove(session);
            }
        }
    }
    
Как видно, нам понадобится аннтоация, которой будут маркироваться события:

@Qualifier
@Retention(RUNTIME)
@Target({METHOD, FIELD, PARAMETER, TYPE})
public @interface JmsToWsMessage {
}


Ещё один способ пришёл в голову буквально за утренним кофе. Для этого нужно:
1. Реализовать возможность доступа к websocket-сессиям из статических методов класса-эндпоинта
2. Сделать метод sendMessage статическим
И в JMS-листнере можно будет отправлять сообщения следующим образом:

SimpleEndpoint.sendMessage(message);

Передача сообщений из WebSocket в JMS

А что если потребуется трансфер в обратном направлении, из вебсокетов в JMS? Тут всё оказалось немного сложнее. Первый вариант, появившийся у меня в голове заключался во внедрении и использовании JMS-контекста в классе-эндпоинте.
Способ оказался нерабочим. Как выяснилось позднее из статьи Bruno Borges, проблема заключается в баге, который не позволяет внедрять JMS-контекст в эндпоинты.
Второй способ заключался во внедрении ConnectionFactory, но и он у меня не сработал - любые JMS-ресурсы, внедрённые не в EJB, оказались null. Вполне возможно, что это из-за особенностей используемого мной сервера приложений (WildFly 8 с внедрённым ActiveMQ).
Но, раз осталась возможность внедрять JMS-ресурсы в EJB, появился третий вариант. Для его реализации создадим сессионный компонент, который будет отправлять сообщения:

@Stateless
public class MessageSender {

    @Resource(mappedName = "java:/activemq/ConnectionFactory")
    private ConnectionFactory connectionFactory;

    @Resource(mappedName = "java:/jms/queue/websocket.in")
    private Queue queue;

    public void sendMessage(String message) {
        try {
            System.out.println("Sending to " + queue.getQueueName());
            try (Connection connection = connectionFactory.createConnection()) {
                connection.start();
                javax.jms.Session session = connection.createSession(true, javax.jms.Session.AUTO_ACKNOWLEDGE);
                MessageProducer producer = session.createProducer(queue);
                TextMessage textMessage = session.createTextMessage();
                textMessage.setText(message);
                producer.send(textMessage);
                connection.stop();
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}
Дело остаётся за малым - внедрить наш сессионный компонент в класс-эндпоинт. Но и тут оказался сюрприз - сработало только внедрение через конструктор, в остальных случаях я получал NullPointerException.

Изменения в WebSocket-эндпоинте
    
    private MessageSender messageSender;

    public SimpleEndpoint() {}

    @Inject
    public SimpleEndpoint(MessageSender messageSender) {
        this.messageSender = messageSender;
    }
В следующем посте я опишу решение данной задачи силами Springframework. По опыту работу с ним, могу предположить, что там всё будет просто.

Полезные ссылки

Комментариев нет:

Отправить комментарий