From 5e5b673afa04b9ab9096d6c435af4cf99fb7c788 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Fri, 5 Aug 2016 12:04:51 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6386 Add test to help diagnose the issue with cross protocol AMQP -> STOMP message acking. --- activemq-amqp/pom.xml | 6 + .../amqp/AmqpAndStompInteropTest.java | 226 ++++++++++++++++++ .../src/test/resources/log4j.properties | 2 +- 3 files changed, 233 insertions(+), 1 deletion(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndStompInteropTest.java diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml index 93e4fe2504..d5e2ca2533 100644 --- a/activemq-amqp/pom.xml +++ b/activemq-amqp/pom.xml @@ -168,6 +168,12 @@ ${netty-all-version} test + + org.fusesource.stompjms + stompjms-client + ${stompjms-version} + test + diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndStompInteropTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndStompInteropTest.java new file mode 100644 index 0000000000..ce98db361c --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndStompInteropTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.fusesource.stomp.jms.StompJmsConnectionFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Ignore +public class AmqpAndStompInteropTest { + + private static final Logger LOG = LoggerFactory.getLogger(AmqpAndStompInteropTest.class); + + @Rule + public TestName name = new TestName(); + + protected BrokerService broker; + private TransportConnector amqpConnector; + private TransportConnector stompConnector; + + @Before + public void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.setSchedulerSupport(false); + + amqpConnector = broker.addConnector("amqp://0.0.0.0:0"); + stompConnector = broker.addConnector("stomp://0.0.0.0:0"); + + return broker; + } + + @Test(timeout = 30000) + public void testSendFromAMQPToSTOMP() throws Exception { + sendMessageToQueueUsingAmqp(); + readMessageFromQueueUsingStomp(); + } + + @Test(timeout = 30000) + public void testSendFromSTOMPToAMQP() throws Exception { + sendMessageToQueueUsingStomp(); + readMessageFromQueueUsingAmqp(); + } + + @Test(timeout = 30000) + public void testSendFromSTOMPToSTOMP() throws Exception { + sendMessageToQueueUsingStomp(); + readMessageFromQueueUsingStomp(); + } + + @Test(timeout = 30000) + public void testSendFromAMQPToAMQP() throws Exception { + sendMessageToQueueUsingAmqp(); + readMessageFromQueueUsingAmqp(); + } + + private String getQueueName() { + return name.getMethodName() + "-Queue"; + } + + private void sendMessageToQueueUsingAmqp() throws Exception { + Connection connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + + try { + TextMessage message = session.createTextMessage("test-message-amqp-source"); + producer.send(message); + + LOG.info("Send AMQP message with Message ID -> {}", message.getJMSMessageID()); + } finally { + connection.close(); + } + } + + private void sendMessageToQueueUsingStomp() throws Exception { + Connection connection = createStompConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + + try { + TextMessage message = session.createTextMessage("test-message-stomp-source"); + producer.send(message); + + LOG.info("Send STOMP message with Message ID -> {}", message.getJMSMessageID()); + } finally { + connection.close(); + } + } + + private void readMessageFromQueueUsingAmqp() throws Exception { + Connection connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + Message received = consumer.receive(2000); + assertNotNull(received); + + LOG.info("Read from AMQP -> message ID = {}", received.getJMSMessageID()); + + assertTrue(received instanceof TextMessage); + + TextMessage textMessage = (TextMessage) received; + assertNotNull(textMessage.getText()); + } + + private void readMessageFromQueueUsingStomp() throws Exception { + Connection connection = createStompConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageConsumer consumer = session.createConsumer(queue); + + connection.start(); + + Message received = consumer.receive(2000); + assertNotNull(received); + + LOG.info("Read from STOMP -> message ID = {}", received.getJMSMessageID()); + + assertTrue(received instanceof TextMessage); + + TextMessage textMessage = (TextMessage) received; + assertNotNull(textMessage.getText()); + } + + private Connection createStompConnection() throws Exception { + + String stompURI = "tcp://localhost:" + stompConnector.getConnectUri().getPort(); + + final StompJmsConnectionFactory factory = new StompJmsConnectionFactory(); + + factory.setBrokerURI(stompURI); + factory.setUsername("admin"); + factory.setPassword("password"); + + final Connection connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + connection.start(); + return connection; + } + + private Connection createAmqpConnection() throws Exception { + + String amqpURI = "amqp://localhost:" + amqpConnector.getConnectUri().getPort(); + + final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI); + + factory.setUsername("admin"); + factory.setPassword("password"); + + final Connection connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + + connection.start(); + return connection; + } +} diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties index f88b152ff3..d25017dac9 100755 --- a/activemq-amqp/src/test/resources/log4j.properties +++ b/activemq-amqp/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ # log4j.rootLogger=WARN, console, file log4j.logger.org.apache.activemq=INFO -log4j.logger.org.apache.activemq.transport.amqp=TRACE +log4j.logger.org.apache.activemq.transport.amqp=DEBUG log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO log4j.logger.org.fusesource=INFO