From 36bd069e8f65a7b615a188d34ad0b951ec086732 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 25 Feb 2015 21:16:33 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5617 First pass removes most direct usages of the qpid client bits and cleans up some tests so that they all start to use the common test support class features. --- .../activemq/transport/amqp/AMQ4563Test.java | 80 +++-------- .../activemq/transport/amqp/AMQ4696Test.java | 11 +- .../activemq/transport/amqp/AMQ4920Test.java | 21 ++- .../amqp/AmqpConnectTimeoutTest.java | 8 +- .../activemq/transport/amqp/AmqpNioTest.java | 26 ---- .../activemq/transport/amqp/AmqpSslTest.java | 59 -------- .../transport/amqp/AmqpTestSupport.java | 94 ++++++++---- .../transport/amqp/AmqpTransformerTest.java | 48 +++---- .../amqp/JMSClientNioPlusSslTest.java | 11 +- .../transport/amqp/JMSClientNioTest.java | 11 +- .../transport/amqp/JMSClientSslTest.java | 10 +- .../transport/amqp/JMSClientTestSupport.java | 18 ++- .../transport/amqp/JmsClientContext.java | 136 ++++++++++++++++++ .../amqp/JmsClientRequestResponseTest.java | 37 +---- .../transport/amqp/SimpleAMQPAuthTest.java | 19 ++- .../transport/amqp/bugs/AMQ4753Test.java | 35 +---- .../transport/amqp/bugs/AMQ4914Test.java | 21 +-- .../transport/amqp/bugs/AMQ5256Test.java | 103 +++++++++++-- .../amqp/protocol/UnsupportedClientTest.java | 40 +++--- .../src/test/resources/log4j.properties | 2 +- 20 files changed, 428 insertions(+), 362 deletions(-) delete mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java delete mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java index e739a51aef..63f32163e9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4563Test.java @@ -20,8 +20,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import java.io.File; - import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; @@ -35,19 +33,14 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; import org.junit.Test; public class AMQ4563Test extends AmqpTestSupport { public static final String KAHADB_DIRECTORY = "target/activemq-data/kahadb-amq4563"; - private String openwireUri; - @Test(timeout = 60000) public void testMessagesAreAckedAMQProducer() throws Exception { int messagesSent = 3; @@ -83,7 +76,8 @@ public class AMQ4563Test extends AmqpTestSupport { ActiveMQAdmin.enableJMSFrameTracing(); assertTrue(brokerService.isPersistent()); - Connection connection = createAMQPConnection(); + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI); + connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(name.getMethodName()); MessageProducer p = session.createProducer(queue); @@ -128,7 +122,8 @@ public class AMQ4563Test extends AmqpTestSupport { ActiveMQAdmin.enableJMSFrameTracing(); assertTrue(brokerService.isPersistent()); - Connection connection = createAMQPConnection(); + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI); + connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(name.getMethodName()); MessageProducer p = session.createProducer(queue); @@ -157,7 +152,9 @@ public class AMQ4563Test extends AmqpTestSupport { } private int readAllMessages(String queueName, String selector) throws JMSException { - Connection connection = createAMQPConnection(); + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI); + connection.start(); + try { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(queueName); @@ -199,28 +196,12 @@ public class AMQ4563Test extends AmqpTestSupport { private void restartBroker(Connection connection, Session session) throws Exception { session.close(); connection.close(); - - stopBroker(); - createBroker(false); - } - - private Connection createAMQPConnection() throws JMSException { - LOG.debug(">>> In createConnection using port {}", port); - final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); - final Connection connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); - return connection; + restartBroker(); } private Connection createAMQConnection() throws JMSException { - LOG.debug(">>> In createConnection using port {}", port); - final ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "password", openwireUri); + LOG.debug(">>> In createConnection using port {}", openwirePort); + final ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "password", openwireURI); final Connection connection = factory.createConnection(); connection.setExceptionListener(new ExceptionListener() { @Override @@ -233,42 +214,17 @@ public class AMQ4563Test extends AmqpTestSupport { } @Override - public void startBroker() throws Exception { - createBroker(true); + protected boolean isUseOpenWireConnector() { + return true; } - /** - * Copied from AmqpTestSupport, modified to use persistence - */ @Override - public void createBroker(boolean deleteAllMessages) throws Exception { - KahaDBStore kaha = new KahaDBStore(); - kaha.setDirectory(new File(KAHADB_DIRECTORY)); + protected boolean isPersistent() { + return true; + } - brokerService = new BrokerService(); - brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); - brokerService.setPersistent(true); - brokerService.setPersistenceAdapter(kaha); - brokerService.setAdvisorySupport(false); - brokerService.setUseJmx(true); - brokerService.getManagementContext().setCreateMBeanServer(false); - brokerService.setStoreOpenWireVersion(10); - openwireUri = brokerService.addConnector("tcp://0.0.0.0:0").getPublishableConnectString(); - - // Setup SSL context... -// final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile()); -// File keystore = new File(classesDir, "../../src/test/resources/keystore"); -// final SpringSslContext sslContext = new SpringSslContext(); -// sslContext.setKeyStore(keystore.getCanonicalPath()); -// sslContext.setKeyStorePassword("password"); -// sslContext.setTrustStore(keystore.getCanonicalPath()); -// sslContext.setTrustStorePassword("password"); -// sslContext.afterPropertiesSet(); -// brokerService.setSslContext(sslContext); - - addAMQPConnector(); - brokerService.start(); - brokerService.waitUntilStarted(); - this.numberOfMessages = 2000; + @Override + protected int getstoreOpenWireVersion() { + return 10; } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java index 738b70da33..677374e74d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4696Test.java @@ -22,20 +22,17 @@ import static org.junit.Assert.assertNotNull; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; -import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.broker.jmx.BrokerView; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; -import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; import org.junit.Test; public class AMQ4696Test extends AmqpTestSupport { @Test(timeout=30*1000) public void simpleDurableTopicTest() throws Exception { - String TOPIC_NAME = "topic://AMQ4696Test" + System.currentTimeMillis(); + String TOPIC_NAME = "AMQ4696Test" + System.currentTimeMillis(); String durableClientId = "AMQPDurableTopicTestClient"; String durableSubscriberName = "durableSubscriberName"; @@ -44,11 +41,11 @@ public class AMQ4696Test extends AmqpTestSupport { int inactiveSubscribersAtStart = adminView.getInactiveDurableTopicSubscribers().length; LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart); - TopicConnectionFactory factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); - Topic topic = new TopicImpl("topic://" + TOPIC_NAME); - TopicConnection subscriberConnection = factory.createTopicConnection(); + TopicConnection subscriberConnection = + JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password"); subscriberConnection.setClientID(durableClientId); TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = subscriberSession.createTopic(TOPIC_NAME); TopicSubscriber messageConsumer = subscriberSession.createDurableSubscriber(topic, durableSubscriberName); assertNotNull(messageConsumer); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java index 73720f1467..94123af6aa 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp; import static org.junit.Assert.assertEquals; +import java.net.URI; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -25,7 +26,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @@ -34,7 +34,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,17 +50,16 @@ public class AMQ4920Test extends AmqpTestSupport { @Test(timeout = 60000) public void testSendWithMultipleConsumers() throws Exception { - ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin"); - connectionFactory.setSyncPublish(false); - Connection connection = connectionFactory.createConnection(); + Connection connection = + JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", false); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis(); + String destinationName = "AMQ4920Test" + System.currentTimeMillis(); Destination destination = session.createTopic(destinationName); connection.start(); ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < CONSUMER_COUNT; i++) { - AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(initLatch, destinationName, port, "Consumer-" + i, latch, ITERATIONS); + AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(initLatch, destinationName, amqpURI, "Consumer-" + i, latch, ITERATIONS); executor.submit(consumerTask); } connection.start(); @@ -103,14 +101,14 @@ class AMQ4930ConsumerTask implements Callable { private final String destinationName; private final String consumerName; private final CountDownLatch messagesReceived; - private final int port; + private final URI amqpURI; private final int expectedMessageCount; private final CountDownLatch started; - public AMQ4930ConsumerTask(CountDownLatch started, String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) { + public AMQ4930ConsumerTask(CountDownLatch started, String destinationName, URI amqpURI, String consumerName, CountDownLatch latch, int expectedMessageCount) { this.started = started; this.destinationName = destinationName; - this.port = port; + this.amqpURI = amqpURI; this.consumerName = consumerName; this.messagesReceived = latch; this.expectedMessageCount = expectedMessageCount; @@ -121,8 +119,7 @@ class AMQ4930ConsumerTask implements Callable { LOG.debug(consumerName + " starting"); Connection connection = null; try { - ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin"); - connection = connectionFactory.createConnection(); + connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(destinationName); MessageConsumer consumer = session.createConsumer(destination); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java index f3ff21e7d3..71a9dca208 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpConnectTimeoutTest.java @@ -146,16 +146,16 @@ public class AmqpConnectTimeoutTest extends AmqpTestSupport { int port = 0; switch (connectorScheme) { case "amqp": - port = this.port; + port = this.amqpPort; break; case "amqp+ssl": - port = this.sslPort; + port = this.amqpSslPort; break; case "amqp+nio": - port = this.nioPort; + port = this.amqpNioPort; break; case "amqp+nio+ssl": - port = this.nioPlusSslPort; + port = this.amqpNioPlusSslPort; break; default: throw new IOException("Invalid AMQP connector scheme passed to test."); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java deleted file mode 100644 index 432ec9047f..0000000000 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpNioTest.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp; - -import org.apache.activemq.broker.BrokerService; - -public class AmqpNioTest extends AmqpTestSupport { - - protected void addAMQPConnector(BrokerService brokerService) throws Exception { - brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1"); - } -} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java deleted file mode 100644 index 93728544f4..0000000000 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpSslTest.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp; - -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; - -import javax.net.ssl.X509TrustManager; - -import org.apache.activemq.broker.BrokerService; -import org.junit.Ignore; - -@Ignore("hangs atm, needs investigation") -public class AmqpSslTest extends AmqpTestSupport { - @Override - public void startBroker() throws Exception { - System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore"); - System.setProperty("javax.net.ssl.trustStorePassword", "password"); - System.setProperty("javax.net.ssl.trustStoreType", "jks"); - System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore"); - System.setProperty("javax.net.ssl.keyStorePassword", "password"); - System.setProperty("javax.net.ssl.keyStoreType", "jks"); - super.startBroker(); - } - - protected void addAMQPConnector(BrokerService brokerService) throws Exception { - brokerService.addConnector("amqp+ssl://localhost:8883"); - } - - static class DefaultTrustManager implements X509TrustManager { - - @Override - public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - } - - @Override - public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - } -} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index fa608abb34..e20168cf5b 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -17,6 +17,7 @@ package org.apache.activemq.transport.amqp; import java.io.File; +import java.net.URI; import java.security.SecureRandom; import java.util.Set; import java.util.Vector; @@ -40,7 +41,9 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.spring.SpringSslContext; +import org.apache.activemq.store.kahadb.KahaDBStore; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -51,6 +54,7 @@ import org.slf4j.LoggerFactory; public class AmqpTestSupport { public static final String MESSAGE_NUMBER = "MessageNumber"; + public static final String KAHADB_DIRECTORY = "target/activemq-data/"; @Rule public TestName name = new TestName(); @@ -61,21 +65,18 @@ public class AmqpTestSupport { protected BrokerService brokerService; protected Vector exceptions = new Vector(); protected int numberOfMessages; - protected int port; - protected int sslPort; - protected int nioPort; - protected int nioPlusSslPort; - protected int openwirePort; - public static void main(String[] args) throws Exception { - final AmqpTestSupport s = new AmqpTestSupport(); - s.sslPort = 5671; - s.port = 5672; - s.startBroker(); - while (true) { - Thread.sleep(100000); - } - } + protected URI amqpURI; + protected int amqpPort; + protected URI amqpSslURI; + protected int amqpSslPort; + protected URI amqpNioURI; + protected int amqpNioPort; + protected URI amqpNioPlusSslURI; + protected int amqpNioPlusSslPort; + + protected URI openwireURI; + protected int openwirePort; @Before public void setUp() throws Exception { @@ -89,10 +90,17 @@ public class AmqpTestSupport { protected void createBroker(boolean deleteAllMessages) throws Exception { brokerService = new BrokerService(); - brokerService.setPersistent(false); + + brokerService.setPersistent(isPersistent()); + brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); + if (isPersistent()) { + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName())); + brokerService.setPersistenceAdapter(kaha); + brokerService.setStoreOpenWireVersion(getstoreOpenWireVersion()); + } brokerService.setSchedulerSupport(false); brokerService.setAdvisorySupport(false); - brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); brokerService.setUseJmx(true); brokerService.getManagementContext().setCreateConnector(false); @@ -111,44 +119,64 @@ public class AmqpTestSupport { sslContext.afterPropertiesSet(); brokerService.setSslContext(sslContext); - addAMQPConnector(); + System.setProperty("javax.net.ssl.trustStore", keystore.getCanonicalPath()); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", keystore.getCanonicalPath()); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + + addTranportConnectors(); } - protected void addAMQPConnector() throws Exception { + protected void addTranportConnectors() throws Exception { TransportConnector connector = null; if (isUseOpenWireConnector()) { connector = brokerService.addConnector( "tcp://0.0.0.0:" + openwirePort); openwirePort = connector.getConnectUri().getPort(); + openwireURI = connector.getPublishableConnectURI(); LOG.debug("Using openwire port " + openwirePort); } if (isUseTcpConnector()) { connector = brokerService.addConnector( - "amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); - port = connector.getConnectUri().getPort(); - LOG.debug("Using amqp port " + port); + "amqp://0.0.0.0:" + amqpPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + amqpPort = connector.getConnectUri().getPort(); + amqpURI = connector.getPublishableConnectURI(); + LOG.debug("Using amqp port " + amqpPort); } if (isUseSslConnector()) { connector = brokerService.addConnector( - "amqp+ssl://0.0.0.0:" + sslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); - sslPort = connector.getConnectUri().getPort(); - LOG.debug("Using amqp+ssl port " + sslPort); + "amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + amqpSslPort = connector.getConnectUri().getPort(); + amqpSslURI = connector.getPublishableConnectURI(); + LOG.debug("Using amqp+ssl port " + amqpSslPort); } if (isUseNioConnector()) { connector = brokerService.addConnector( - "amqp+nio://0.0.0.0:" + nioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); - nioPort = connector.getConnectUri().getPort(); - LOG.debug("Using amqp+nio port " + nioPort); + "amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + amqpNioPort = connector.getConnectUri().getPort(); + amqpNioURI = connector.getPublishableConnectURI(); + LOG.debug("Using amqp+nio port " + amqpNioPort); } if (isUseNioPlusSslConnector()) { connector = brokerService.addConnector( - "amqp+nio+ssl://0.0.0.0:" + nioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); - nioPlusSslPort = connector.getConnectUri().getPort(); - LOG.debug("Using amqp+nio+ssl port " + nioPlusSslPort); + "amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig()); + amqpNioPlusSslPort = connector.getConnectUri().getPort(); + amqpNioPlusSslURI = connector.getPublishableConnectURI(); + LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort); } } + protected boolean isPersistent() { + return false; + } + + protected int getstoreOpenWireVersion() { + return OpenWireFormat.DEFAULT_VERSION; + } + protected boolean isUseOpenWireConnector() { return false; } @@ -188,8 +216,12 @@ public class AmqpTestSupport { } public void restartBroker() throws Exception { + restartBroker(false); + } + + public void restartBroker(boolean deleteAllOnStartup) throws Exception { stopBroker(); - createBroker(false); + createBroker(deleteAllOnStartup); brokerService.start(); brokerService.waitUntilStarted(); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java index f0a25c7e17..65d38a4059 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransformerTest.java @@ -21,10 +21,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.net.URI; + import javax.jms.BytesMessage; import javax.jms.Connection; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -35,8 +35,6 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; -import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; import org.junit.After; import org.junit.Test; import org.slf4j.Logger; @@ -48,8 +46,8 @@ public class AmqpTransformerTest { private static final String AMQP_URL = "amqp://0.0.0.0:0%s"; private BrokerService brokerService; - private int amqpPort; - private int openwirePort; + private URI amqpConnectionURI; + private URI openwireConnectionURI; private static final String TEST_QUEUE = "txqueue"; @Test(timeout = 30 * 1000) @@ -59,10 +57,11 @@ public class AmqpTransformerTest { startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=native")); // send "text message" with AMQP JMS API - Connection amqpConnection = createAmqpConnection(); - QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE); + Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI); + amqpConnection.start(); Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = amqpSession.createQueue(TEST_QUEUE); MessageProducer p = amqpSession.createProducer(queue); p.setPriority(7); @@ -75,7 +74,7 @@ public class AmqpTransformerTest { amqpConnection.close(); // receive with openwire JMS - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI); Connection openwireConn = factory.createConnection(); openwireConn.start(); Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -105,10 +104,11 @@ public class AmqpTransformerTest { startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=raw")); // send "text message" with AMQP JMS API - Connection amqpConnection = createAmqpConnection(); - QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE); + Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI); + amqpConnection.start(); Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = amqpSession.createQueue(TEST_QUEUE); MessageProducer p = amqpSession.createProducer(queue); p.setPriority(7); @@ -121,7 +121,7 @@ public class AmqpTransformerTest { amqpConnection.close(); // receive with openwire JMS - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI); Connection openwireConn = factory.createConnection(); openwireConn.start(); Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -156,10 +156,11 @@ public class AmqpTransformerTest { startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms")); // send "text message" with AMQP JMS API - Connection amqpConnection = createAmqpConnection(); - QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE); + Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI); + amqpConnection.start(); Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = amqpSession.createQueue(TEST_QUEUE); MessageProducer p = amqpSession.createProducer(queue); TextMessage amqpMessage = amqpSession.createTextMessage(); @@ -171,7 +172,7 @@ public class AmqpTransformerTest { amqpConnection.close(); // receive with openwire JMS - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI); Connection openwireConn = factory.createConnection(); openwireConn.start(); Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -193,19 +194,6 @@ public class AmqpTransformerTest { openwireConn.close(); } - public Connection createAmqpConnection() throws JMSException { - final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", amqpPort, "admin", "password"); - final Connection connection = factory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); - return connection; - } - public void startBrokerWithAmqpTransport(String amqpUrl) throws Exception { brokerService = new BrokerService(); brokerService.setPersistent(false); @@ -214,9 +202,9 @@ public class AmqpTransformerTest { brokerService.setDeleteAllMessagesOnStartup(true); TransportConnector connector = brokerService.addConnector(amqpUrl); - amqpPort = connector.getConnectUri().getPort(); + amqpConnectionURI = connector.getPublishableConnectURI(); connector = brokerService.addConnector("tcp://0.0.0.0:0"); - openwirePort = connector.getConnectUri().getPort(); + openwireConnectionURI = connector.getPublishableConnectURI(); brokerService.start(); brokerService.waitUntilStarted(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java index 758607d554..c7fc178995 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioPlusSslTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.transport.amqp; +import java.net.URI; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +29,13 @@ public class JMSClientNioPlusSslTest extends JMSClientSslTest { @Override protected int getBrokerPort() { - LOG.debug("JMSClientNioPlusSslTest.getBrokerPort returning nioPlusSslPort {}", nioPlusSslPort); - return nioPlusSslPort; + LOG.debug("JMSClientNioPlusSslTest.getBrokerPort returning nioPlusSslPort {}", amqpNioPlusSslPort); + return amqpNioPlusSslPort; + } + + @Override + protected URI getBrokerURI() { + return amqpNioPlusSslURI; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java index fa7c6c01f2..a2f0417dd6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.transport.amqp; +import java.net.URI; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +29,13 @@ public class JMSClientNioTest extends JMSClientTest { @Override protected int getBrokerPort() { - LOG.debug("JMSClientNioTest.getBrokerPort returning nioPort {}", nioPort); - return nioPort; + LOG.debug("JMSClientNioTest.getBrokerPort returning nioPort {}", amqpNioPort); + return amqpNioPort; + } + + @Override + protected URI getBrokerURI() { + return amqpNioURI; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java index 9901217ff4..94af75978f 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp; +import java.net.URI; import java.security.SecureRandom; import javax.jms.Connection; @@ -49,8 +50,13 @@ public class JMSClientSslTest extends JMSClientTest { @Override protected int getBrokerPort() { - LOG.debug("JMSClientSslTest.getBrokerPort returning sslPort {}", sslPort); - return sslPort; + LOG.debug("JMSClientSslTest.getBrokerPort returning sslPort {}", amqpSslPort); + return amqpSslPort; + } + + @Override + protected URI getBrokerURI() { + return amqpSslURI; } @Override diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java index c7d81a500c..0da90c3426 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTestSupport.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp; +import java.net.URI; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -26,7 +27,6 @@ import javax.jms.Connection; import javax.jms.ExceptionListener; import javax.jms.JMSException; -import org.apache.activemq.spring.SpringSslContext; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; import org.junit.After; @@ -84,7 +84,16 @@ public class JMSClientTestSupport extends AmqpTestSupport { * @return the port to connect to on the Broker. */ protected int getBrokerPort() { - return port; + return amqpPort; + } + + /** + * Can be overridden in subclasses to test against a different transport suchs as NIO. + * + * @return the URI to connect to on the Broker for AMQP. + */ + protected URI getBrokerURI() { + return amqpURI; } protected Connection createConnection() throws JMSException { @@ -106,10 +115,9 @@ public class JMSClientTestSupport extends AmqpTestSupport { final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl); if (useSsl) { - SpringSslContext context = (SpringSslContext) brokerService.getSslContext(); - factory.setKeyStorePath(context.getKeyStore()); + factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore")); factory.setKeyStorePassword("password"); - factory.setTrustStorePath(context.getTrustStore()); + factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore")); factory.setTrustStorePassword("password"); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java new file mode 100644 index 0000000000..dd7b699e5e --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientContext.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.QueueConnection; +import javax.jms.TopicConnection; + +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Context used for AMQP JMS Clients to create connection instances. + */ +public class JmsClientContext { + + private static final Logger LOG = LoggerFactory.getLogger(JmsClientContext.class); + + public static final JmsClientContext INSTANCE = new JmsClientContext(); + + //----- Plain JMS Connection Create methods ------------------------------// + + public Connection createConnection(URI remoteURI) throws JMSException { + return createConnection(remoteURI, null, null, true); + } + + public Connection createConnection(URI remoteURI, String username, String password) throws JMSException { + return createConnection(remoteURI, username, password, null, true); + } + + public Connection createConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException { + return createConnection(remoteURI, username, password, null, syncPublish); + } + + public Connection createConnection(URI remoteURI, String username, String password, String clientId) throws JMSException { + return createConnection(remoteURI, username, password, clientId, true); + } + + public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException { + ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish); + + return factory.createConnection(); + } + + //----- JMS TopicConnection Create methods -------------------------------// + + public TopicConnection createTopicConnection(URI remoteURI) throws JMSException { + return createTopicConnection(remoteURI, null, null, true); + } + + public TopicConnection createTopicConnection(URI remoteURI, String username, String password) throws JMSException { + return createTopicConnection(remoteURI, username, password, null, true); + } + + public TopicConnection createTopicConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException { + return createTopicConnection(remoteURI, username, password, null, syncPublish); + } + + public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId) throws JMSException { + return createTopicConnection(remoteURI, username, password, clientId, true); + } + + public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException { + ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish); + + return factory.createTopicConnection(); + } + + //----- JMS QueueConnection Create methods -------------------------------// + + public QueueConnection createQueueConnection(URI remoteURI) throws JMSException { + return createQueueConnection(remoteURI, null, null, true); + } + + public QueueConnection createQueueConnection(URI remoteURI, String username, String password) throws JMSException { + return createQueueConnection(remoteURI, username, password, null, true); + } + + public QueueConnection createQueueConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException { + return createQueueConnection(remoteURI, username, password, null, syncPublish); + } + + public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId) throws JMSException { + return createQueueConnection(remoteURI, username, password, clientId, true); + } + + public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException { + ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish); + + return factory.createQueueConnection(); + } + + //------ Internal Implementation bits ------------------------------------// + + private ConnectionFactoryImpl createConnectionFactory( + URI remoteURI, String username, String password, String clientId, boolean syncPublish) { + + boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl"); + + LOG.debug("In createConnectionFactory using port {} ssl? {}", remoteURI.getPort(), useSSL); + + ConnectionFactoryImpl factory = + new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, clientId, useSSL); + + if (useSSL) { + factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore")); + factory.setKeyStorePassword("password"); + factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore")); + factory.setTrustStorePassword("password"); + } + + factory.setTopicPrefix("topic://"); + factory.setQueuePrefix("queue://"); + factory.setSyncPublish(syncPublish); + + return factory; + } +} diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java index 03c870d409..1446c10bcd 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsClientRequestResponseTest.java @@ -25,7 +25,6 @@ import java.util.Vector; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -36,7 +35,6 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -157,41 +155,8 @@ public class JmsClientRequestResponseTest extends AmqpTestSupport implements Mes assertEquals("Should not have had any failures: " + failures, 0, failures.size()); } - /** - * Can be overridden in subclasses to test against a different transport suchs as NIO. - * - * @return the port to connect to on the Broker. - */ - protected int getBrokerPort() { - return port; - } - private Connection createConnection(String clientId) throws JMSException { - return createConnection(clientId, false, false); - } - - protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException { - - int brokerPort = getBrokerPort(); - LOG.debug("Creating connection on port {}", brokerPort); - final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl); - - factory.setSyncPublish(syncPublish); - factory.setTopicPrefix("topic://"); - factory.setQueuePrefix("queue://"); - - final Connection connection = factory.createConnection(); - if (clientId != null && !clientId.isEmpty()) { - connection.setClientID(clientId); - } - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); - return connection; + return JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", clientId); } protected void syncConsumeLoop(MessageConsumer requestConsumer) { diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java index 0e62a338bd..c9b8c23311 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SimpleAMQPAuthTest.java @@ -1,4 +1,3 @@ -package org.apache.activemq.transport.amqp; /** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -15,6 +14,8 @@ package org.apache.activemq.transport.amqp; * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.activemq.transport.amqp; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -28,6 +29,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; @@ -35,7 +37,6 @@ import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; import org.apache.qpid.amqp_1_0.client.ConnectionClosedException; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; -import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -68,6 +69,9 @@ public class SimpleAMQPAuthTest { public void testNoUserOrPassword() throws Exception { try { ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "", ""); + factory.setQueuePrefix("queue://"); + factory.setTopicPrefix("topic://"); + Connection connection = factory.createConnection(); connection.setExceptionListener(new ExceptionListener() { @Override @@ -96,6 +100,9 @@ public class SimpleAMQPAuthTest { public void testUnknownUser() throws Exception { try { ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); + factory.setQueuePrefix("queue://"); + factory.setTopicPrefix("topic://"); + Connection connection = factory.createConnection("nosuchuser", "blah"); connection.start(); Thread.sleep(500); @@ -117,6 +124,9 @@ public class SimpleAMQPAuthTest { public void testKnownUserWrongPassword() throws Exception { try { ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); + factory.setQueuePrefix("queue://"); + factory.setTopicPrefix("topic://"); + Connection connection = factory.createConnection("user", "wrongPassword"); connection.start(); Thread.sleep(500); @@ -137,9 +147,12 @@ public class SimpleAMQPAuthTest { @Test(timeout = 30000) public void testSendReceive() throws Exception { ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); + factory.setQueuePrefix("queue://"); + factory.setTopicPrefix("topic://"); + Connection connection = factory.createConnection("user", "userPassword"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - QueueImpl queue = new QueueImpl("queue://txqueue"); + Queue queue = session.createQueue("txQueue"); MessageProducer p = session.createProducer(queue); TextMessage message = null; message = session.createTextMessage(); diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java index 49ee0e72f8..42580394af 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4753Test.java @@ -21,18 +21,16 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import javax.jms.Connection; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import org.apache.activemq.spring.SpringSslContext; import org.apache.activemq.transport.amqp.AmqpTestSupport; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; -import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; +import org.apache.activemq.transport.amqp.JmsClientContext; import org.junit.Test; public class AMQ4753Test extends AmqpTestSupport { @@ -48,14 +46,14 @@ public class AMQ4753Test extends AmqpTestSupport { } @Test(timeout = 120 * 1000) - public void testAmqpNioPlusSslSendReceive() throws JMSException{ - Connection connection = createAMQPConnection(nioPlusSslPort, true); + public void testAmqpNioPlusSslSendReceive() throws JMSException { + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI); runSimpleSendReceiveTest(connection); } public void runSimpleSendReceiveTest(Connection connection) throws JMSException{ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - QueueImpl queue = new QueueImpl("queue://txqueue"); + Queue queue =session.createQueue("txqueue"); MessageProducer producer = session.createProducer(queue); TextMessage message = session.createTextMessage(); String messageText = "hello sent at " + new java.util.Date().toString(); @@ -72,27 +70,4 @@ public class AMQ4753Test extends AmqpTestSupport { assertEquals(messageText, textMessage.getText()); connection.close(); } - - private Connection createAMQPConnection(int testPort, boolean useSSL) throws JMSException { - LOG.debug("In createConnection using port {} ssl? {}", testPort, useSSL); - final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", testPort, "admin", "password", null, useSSL); - - if (useSSL) { - SpringSslContext sslContext = (SpringSslContext) brokerService.getSslContext(); - connectionFactory.setKeyStorePath(sslContext.getKeyStore()); - connectionFactory.setKeyStorePassword("password"); - connectionFactory.setTrustStorePath(sslContext.getTrustStore()); - connectionFactory.setTrustStorePassword("password"); - } - - final Connection connection = connectionFactory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); - return connection; - } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java index 90028540c4..d171ea9fcc 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ4914Test.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import javax.jms.Connection; -import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -31,7 +30,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.transport.amqp.AmqpTestSupport; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.activemq.transport.amqp.JmsClientContext; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -80,8 +79,7 @@ public class AMQ4914Test extends AmqpTestSupport { String payload = createLargeString(expectedSize); assertEquals(expectedSize, payload.getBytes().length); - Connection connection = createAMQPConnection(port, false); - + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI); long startTime = System.currentTimeMillis(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(testName.getMethodName()); @@ -108,19 +106,4 @@ public class AMQ4914Test extends AmqpTestSupport { assertEquals(payload, receivedText); connection.close(); } - - private Connection createAMQPConnection(int testPort, boolean useSSL) throws JMSException { - LOG.debug("In createConnection using port {} ssl? {}", testPort, useSSL); - final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", testPort, "admin", "password", null, useSSL); - connectionFactory.setSyncPublish(true); - final Connection connection = connectionFactory.createConnection(); - connection.setExceptionListener(new ExceptionListener() { - @Override - public void onException(JMSException exception) { - exception.printStackTrace(); - } - }); - connection.start(); - return connection; - } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java index 85acb640b7..989e5015f7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java @@ -16,18 +16,19 @@ */ package org.apache.activemq.transport.amqp.bugs; +import static org.junit.Assert.assertTrue; + import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; + import javax.jms.Connection; import javax.jms.JMSException; + import org.apache.activemq.transport.amqp.AmqpTestSupport; -import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.apache.activemq.transport.amqp.JmsClientContext; import org.junit.Test; - -import static org.junit.Assert.assertTrue; - public class AMQ5256Test extends AmqpTestSupport { @Override @@ -36,13 +37,23 @@ public class AMQ5256Test extends AmqpTestSupport { } @Override - protected boolean isUseNioPlusSslConnector() { - return false; + protected boolean isUseSslConnector() { + return true; } - @Test(timeout = 40 * 1000) - public void testParallelConnect() throws Exception { - final int numThreads = 80; + @Override + protected boolean isUseNioConnector() { + return true; + } + + @Override + protected boolean isUseNioPlusSslConnector() { + return true; + } + + @Test(timeout = 60000) + public void testParallelConnectPlain() throws Exception { + final int numThreads = 40; ExecutorService executorService = Executors.newFixedThreadPool(numThreads); for (int i = 0; i < numThreads; i++) { executorService.execute(new Runnable() { @@ -50,8 +61,7 @@ public class AMQ5256Test extends AmqpTestSupport { public void run() { try { - final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "password", null, isUseSslConnector()); - Connection connection = connectionFactory.createConnection(); + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password"); connection.start(); connection.close(); } catch (JMSException e) { @@ -63,6 +73,77 @@ public class AMQ5256Test extends AmqpTestSupport { executorService.shutdown(); assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS)); + } + @Test(timeout = 60000) + public void testParallelConnectNio() throws Exception { + final int numThreads = 40; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + + try { + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password"); + connection.start(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + } + + executorService.shutdown(); + assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS)); + } + + @Test(timeout = 60000) + public void testParallelConnectSsl() throws Exception { + final int numThreads = 40; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + + try { + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password"); + connection.start(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + } + + executorService.shutdown(); + assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS)); + } + + @Test(timeout = 60000) + public void testParallelConnectNioPlusSsl() throws Exception { + final int numThreads = 40; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + + try { + Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password"); + connection.start(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + } + + executorService.shutdown(); + assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS)); } } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java index 1e84fac389..d71aee28cc 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/protocol/UnsupportedClientTest.java @@ -70,16 +70,16 @@ public class UnsupportedClientTest extends AmqpTestSupport { header.setRevision(1); // Test TCP - doTestInvalidHeaderProcessing(port, header, false); + doTestInvalidHeaderProcessing(amqpPort, header, false); // Test SSL - doTestInvalidHeaderProcessing(sslPort, header, true); + doTestInvalidHeaderProcessing(amqpSslPort, header, true); // Test NIO - doTestInvalidHeaderProcessing(nioPort, header, false); + doTestInvalidHeaderProcessing(amqpNioPort, header, false); // Test NIO+SSL - doTestInvalidHeaderProcessing(nioPlusSslPort, header, true); + doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true); } @Test(timeout = 60000) @@ -92,16 +92,16 @@ public class UnsupportedClientTest extends AmqpTestSupport { header.setRevision(0); // Test TCP - doTestInvalidHeaderProcessing(port, header, false); + doTestInvalidHeaderProcessing(amqpPort, header, false); // Test SSL - doTestInvalidHeaderProcessing(sslPort, header, true); + doTestInvalidHeaderProcessing(amqpSslPort, header, true); // Test NIO - doTestInvalidHeaderProcessing(nioPort, header, false); + doTestInvalidHeaderProcessing(amqpNioPort, header, false); // Test NIO+SSL - doTestInvalidHeaderProcessing(nioPlusSslPort, header, true); + doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true); } @Test(timeout = 60000) @@ -114,16 +114,16 @@ public class UnsupportedClientTest extends AmqpTestSupport { header.setRevision(0); // Test TCP - doTestInvalidHeaderProcessing(port, header, false); + doTestInvalidHeaderProcessing(amqpPort, header, false); // Test SSL - doTestInvalidHeaderProcessing(sslPort, header, true); + doTestInvalidHeaderProcessing(amqpSslPort, header, true); // Test NIO - doTestInvalidHeaderProcessing(nioPort, header, false); + doTestInvalidHeaderProcessing(amqpNioPort, header, false); // Test NIO+SSL - doTestInvalidHeaderProcessing(nioPlusSslPort, header, true); + doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true); } @Test(timeout = 60000) @@ -136,16 +136,16 @@ public class UnsupportedClientTest extends AmqpTestSupport { header.setRevision(1); // Test TCP - doTestInvalidHeaderProcessing(port, header, false); + doTestInvalidHeaderProcessing(amqpPort, header, false); // Test SSL - doTestInvalidHeaderProcessing(sslPort, header, true); + doTestInvalidHeaderProcessing(amqpSslPort, header, true); // Test NIO - doTestInvalidHeaderProcessing(nioPort, header, false); + doTestInvalidHeaderProcessing(amqpNioPort, header, false); // Test NIO+SSL - doTestInvalidHeaderProcessing(nioPlusSslPort, header, true); + doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true); } @Test(timeout = 60000) @@ -154,16 +154,16 @@ public class UnsupportedClientTest extends AmqpTestSupport { AmqpHeader header = new AmqpHeader(new Buffer(new byte[]{'S', 'T', 'O', 'M', 'P', 0, 0, 0}), false); // Test TCP - doTestInvalidHeaderProcessing(port, header, false); + doTestInvalidHeaderProcessing(amqpPort, header, false); // Test SSL - doTestInvalidHeaderProcessing(sslPort, header, true); + doTestInvalidHeaderProcessing(amqpSslPort, header, true); // Test NIO - doTestInvalidHeaderProcessing(nioPort, header, false); + doTestInvalidHeaderProcessing(amqpNioPort, header, false); // Test NIO+SSL - doTestInvalidHeaderProcessing(nioPlusSslPort, header, true); + doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true); } protected void doTestInvalidHeaderProcessing(int port, final AmqpHeader header, boolean ssl) throws Exception { diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties index 0b73c3bdda..2c3e8c3a7d 100755 --- a/activemq-amqp/src/test/resources/log4j.properties +++ b/activemq-amqp/src/test/resources/log4j.properties @@ -20,7 +20,7 @@ # log4j.rootLogger=WARN, console, file log4j.logger.org.apache.activemq=INFO -log4j.logger.org.apache.activemq.transport.amqp=TRACE +log4j.logger.org.apache.activemq.transport.amqp=DEBUG log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO log4j.logger.org.fusesource=INFO