From 0e148fb4ba9e8f75bd39a641b697ec975a340fcb Mon Sep 17 00:00:00 2001 From: Christian Posta Date: Sun, 24 Feb 2013 22:42:09 +0000 Subject: [PATCH] Added tests for AMQP to JMS transformations git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1449562 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/amqp/AmqpTransportFilter.java | 2 +- .../transport/amqp/AmqpTransformerTest.java | 226 ++++++++++++++++++ 2 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java index 6226626c08..641775b37b 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java @@ -33,7 +33,7 @@ import java.security.cert.X509Certificate; /** * The AMQPTransportFilter normally sits on top of a TcpTransport that has been - * configured with the StompWireFormat and is used to convert AMQP commands to + * configured with the AmqpWireFormat and is used to convert AMQP commands to * ActiveMQ commands. All of the conversion work is done by delegating to the * AMQPProtocolConverter */ diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java new file mode 100644 index 0000000000..2f71ea3815 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.junit.After; +import org.junit.Test; + +import javax.jms.*; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * @author Christian Posta + */ +public class AmqpTransformerTest { + + private static final String AMQP_URL = "amqp://0.0.0.0:0%s"; + private BrokerService brokerService; + private int amqpPort; + private int openwirePort; + private static final String TEST_QUEUE = "txqueue"; + + @Test + public void testNativeTransformation() throws Exception { + + // default is native + startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=native")); + + // send "text message" with AMQP JMS API + Connection amqpConnection = createAmqpConnection(); + QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE); + + Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = amqpSession.createProducer(queue); + p.setPriority(7); + + TextMessage amqpMessage = amqpSession.createTextMessage(); + amqpMessage.setText("hello"); + p.send(amqpMessage); + + p.close(); + amqpSession.close(); + amqpConnection.close(); + + + // receive with openwire JMS + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort); + Connection openwireConn = factory.createConnection(); + openwireConn.start(); + Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue jmsDest = session.createQueue(TEST_QUEUE); + + MessageConsumer c = session.createConsumer(jmsDest); + + Message message = c.receive(1000); + + assertTrue(message instanceof BytesMessage); + Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); + Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); + assertEquals(0L, messageFormat.longValue()); + assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed); + assertEquals(2, message.getJMSDeliveryMode()); + assertEquals(7, message.getJMSPriority()); + + c.close(); + session.close(); + openwireConn.close(); + + } + + @Test + public void testRawTransformation() throws Exception { + + // default is native + startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=raw")); + + // send "text message" with AMQP JMS API + Connection amqpConnection = createAmqpConnection(); + QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE); + + Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = amqpSession.createProducer(queue); + p.setPriority(7); + + TextMessage amqpMessage = amqpSession.createTextMessage(); + amqpMessage.setText("hello"); + p.send(amqpMessage); + + p.close(); + amqpSession.close(); + amqpConnection.close(); + + + // receive with openwire JMS + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort); + Connection openwireConn = factory.createConnection(); + openwireConn.start(); + Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue jmsDest = session.createQueue(TEST_QUEUE); + + MessageConsumer c = session.createConsumer(jmsDest); + + Message message = c.receive(1000); + + assertTrue(message instanceof BytesMessage); + Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); + Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); + assertEquals(0L, messageFormat.longValue()); + assertTrue("Didn't use the correct transformation, expected NATIVE", nativeTransformationUsed); + assertEquals(2, message.getJMSDeliveryMode()); + + // should not equal 7 (should equal the default) because "raw" does not map + // headers + assertEquals(4, message.getJMSPriority()); + + + c.close(); + session.close(); + openwireConn.close(); + + } + + @Test + public void testJmsTransformation() throws Exception { + + // default is native + startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms")); + + // send "text message" with AMQP JMS API + Connection amqpConnection = createAmqpConnection(); + QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE); + + Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer p = amqpSession.createProducer(queue); + + TextMessage amqpMessage = amqpSession.createTextMessage(); + amqpMessage.setText("hello"); + p.send(amqpMessage); + + p.close(); + amqpSession.close(); + amqpConnection.close(); + + + // receive with openwire JMS + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort); + Connection openwireConn = factory.createConnection(); + openwireConn.start(); + Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue jmsDest = session.createQueue(TEST_QUEUE); + + MessageConsumer c = session.createConsumer(jmsDest); + + Message message = c.receive(1000); + + assertTrue(message instanceof TextMessage); + Boolean nativeTransformationUsed = message.getBooleanProperty("JMS_AMQP_NATIVE"); + Long messageFormat = message.getLongProperty("JMS_AMQP_MESSAGE_FORMAT"); + assertEquals(0L, messageFormat.longValue()); + assertFalse("Didn't use the correct transformation, expected NOT to be NATIVE", nativeTransformationUsed); + assertEquals(2, message.getJMSDeliveryMode()); + + c.close(); + session.close(); + openwireConn.close(); + + } + + public Connection createAmqpConnection() throws JMSException { + final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", amqpPort, "admin", "password"); + final Connection connection = factory.createConnection(); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + connection.start(); + return connection; + } + + + public void startBrokerWithAmqpTransport(String amqpUrl) throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + brokerService.setDeleteAllMessagesOnStartup(true); + + TransportConnector connector = brokerService.addConnector(amqpUrl); + amqpPort = connector.getConnectUri().getPort(); + connector = brokerService.addConnector("tcp://0.0.0.0:0"); + openwirePort = connector.getConnectUri().getPort(); + + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService = null; + } + } +}