First pass removes most direct usages of the qpid client bits and cleans
up some tests so that they all start to use the common test support
class features.
This commit is contained in:
Timothy Bish 2015-02-25 21:16:33 -05:00
parent f988ca6e49
commit 36bd069e8f
20 changed files with 428 additions and 362 deletions

View File

@ -20,8 +20,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@ -35,19 +33,14 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.junit.Test;
public class AMQ4563Test extends AmqpTestSupport {
public static final String KAHADB_DIRECTORY = "target/activemq-data/kahadb-amq4563";
private String openwireUri;
@Test(timeout = 60000)
public void testMessagesAreAckedAMQProducer() throws Exception {
int messagesSent = 3;
@ -83,7 +76,8 @@ public class AMQ4563Test extends AmqpTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
assertTrue(brokerService.isPersistent());
Connection connection = createAMQPConnection();
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer p = session.createProducer(queue);
@ -128,7 +122,8 @@ public class AMQ4563Test extends AmqpTestSupport {
ActiveMQAdmin.enableJMSFrameTracing();
assertTrue(brokerService.isPersistent());
Connection connection = createAMQPConnection();
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer p = session.createProducer(queue);
@ -157,7 +152,9 @@ public class AMQ4563Test extends AmqpTestSupport {
}
private int readAllMessages(String queueName, String selector) throws JMSException {
Connection connection = createAMQPConnection();
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
connection.start();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
@ -199,28 +196,12 @@ public class AMQ4563Test extends AmqpTestSupport {
private void restartBroker(Connection connection, Session session) throws Exception {
session.close();
connection.close();
stopBroker();
createBroker(false);
}
private Connection createAMQPConnection() throws JMSException {
LOG.debug(">>> In createConnection using port {}", port);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
restartBroker();
}
private Connection createAMQConnection() throws JMSException {
LOG.debug(">>> In createConnection using port {}", port);
final ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "password", openwireUri);
LOG.debug(">>> In createConnection using port {}", openwirePort);
final ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "password", openwireURI);
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
@ -233,42 +214,17 @@ public class AMQ4563Test extends AmqpTestSupport {
}
@Override
public void startBroker() throws Exception {
createBroker(true);
protected boolean isUseOpenWireConnector() {
return true;
}
/**
* Copied from AmqpTestSupport, modified to use persistence
*/
@Override
public void createBroker(boolean deleteAllMessages) throws Exception {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY));
protected boolean isPersistent() {
return true;
}
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setPersistent(true);
brokerService.setPersistenceAdapter(kaha);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateMBeanServer(false);
brokerService.setStoreOpenWireVersion(10);
openwireUri = brokerService.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
// Setup SSL context...
// final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
// File keystore = new File(classesDir, "../../src/test/resources/keystore");
// final SpringSslContext sslContext = new SpringSslContext();
// sslContext.setKeyStore(keystore.getCanonicalPath());
// sslContext.setKeyStorePassword("password");
// sslContext.setTrustStore(keystore.getCanonicalPath());
// sslContext.setTrustStorePassword("password");
// sslContext.afterPropertiesSet();
// brokerService.setSslContext(sslContext);
addAMQPConnector();
brokerService.start();
brokerService.waitUntilStarted();
this.numberOfMessages = 2000;
@Override
protected int getstoreOpenWireVersion() {
return 10;
}
}

View File

@ -22,20 +22,17 @@ import static org.junit.Assert.assertNotNull;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
import org.junit.Test;
public class AMQ4696Test extends AmqpTestSupport {
@Test(timeout=30*1000)
public void simpleDurableTopicTest() throws Exception {
String TOPIC_NAME = "topic://AMQ4696Test" + System.currentTimeMillis();
String TOPIC_NAME = "AMQ4696Test" + System.currentTimeMillis();
String durableClientId = "AMQPDurableTopicTestClient";
String durableSubscriberName = "durableSubscriberName";
@ -44,11 +41,11 @@ public class AMQ4696Test extends AmqpTestSupport {
int inactiveSubscribersAtStart = adminView.getInactiveDurableTopicSubscribers().length;
LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
TopicConnectionFactory factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
Topic topic = new TopicImpl("topic://" + TOPIC_NAME);
TopicConnection subscriberConnection = factory.createTopicConnection();
TopicConnection subscriberConnection =
JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
subscriberConnection.setClientID(durableClientId);
TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = subscriberSession.createTopic(TOPIC_NAME);
TopicSubscriber messageConsumer = subscriberSession.createDurableSubscriber(topic, durableSubscriberName);
assertNotNull(messageConsumer);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -25,7 +26,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@ -34,7 +34,6 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,17 +50,16 @@ public class AMQ4920Test extends AmqpTestSupport {
@Test(timeout = 60000)
public void testSendWithMultipleConsumers() throws Exception {
ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
connectionFactory.setSyncPublish(false);
Connection connection = connectionFactory.createConnection();
Connection connection =
JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", false);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis();
String destinationName = "AMQ4920Test" + System.currentTimeMillis();
Destination destination = session.createTopic(destinationName);
connection.start();
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < CONSUMER_COUNT; i++) {
AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(initLatch, destinationName, port, "Consumer-" + i, latch, ITERATIONS);
AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(initLatch, destinationName, amqpURI, "Consumer-" + i, latch, ITERATIONS);
executor.submit(consumerTask);
}
connection.start();
@ -103,14 +101,14 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
private final String destinationName;
private final String consumerName;
private final CountDownLatch messagesReceived;
private final int port;
private final URI amqpURI;
private final int expectedMessageCount;
private final CountDownLatch started;
public AMQ4930ConsumerTask(CountDownLatch started, String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
public AMQ4930ConsumerTask(CountDownLatch started, String destinationName, URI amqpURI, String consumerName, CountDownLatch latch, int expectedMessageCount) {
this.started = started;
this.destinationName = destinationName;
this.port = port;
this.amqpURI = amqpURI;
this.consumerName = consumerName;
this.messagesReceived = latch;
this.expectedMessageCount = expectedMessageCount;
@ -121,8 +119,7 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
LOG.debug(consumerName + " starting");
Connection connection = null;
try {
ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
connection = connectionFactory.createConnection();
connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(destinationName);
MessageConsumer consumer = session.createConsumer(destination);

View File

@ -146,16 +146,16 @@ public class AmqpConnectTimeoutTest extends AmqpTestSupport {
int port = 0;
switch (connectorScheme) {
case "amqp":
port = this.port;
port = this.amqpPort;
break;
case "amqp+ssl":
port = this.sslPort;
port = this.amqpSslPort;
break;
case "amqp+nio":
port = this.nioPort;
port = this.amqpNioPort;
break;
case "amqp+nio+ssl":
port = this.nioPlusSslPort;
port = this.amqpNioPlusSslPort;
break;
default:
throw new IOException("Invalid AMQP connector scheme passed to test.");

View File

@ -1,26 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import org.apache.activemq.broker.BrokerService;
public class AmqpNioTest extends AmqpTestSupport {
protected void addAMQPConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("amqp+nio://localhost:1883?maxInactivityDuration=-1");
}
}

View File

@ -1,59 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import javax.net.ssl.X509TrustManager;
import org.apache.activemq.broker.BrokerService;
import org.junit.Ignore;
@Ignore("hangs atm, needs investigation")
public class AmqpSslTest extends AmqpTestSupport {
@Override
public void startBroker() throws Exception {
System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
super.startBroker();
}
protected void addAMQPConnector(BrokerService brokerService) throws Exception {
brokerService.addConnector("amqp+ssl://localhost:8883");
}
static class DefaultTrustManager implements X509TrustManager {
@Override
public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp;
import java.io.File;
import java.net.URI;
import java.security.SecureRandom;
import java.util.Set;
import java.util.Vector;
@ -40,7 +41,9 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@ -51,6 +54,7 @@ import org.slf4j.LoggerFactory;
public class AmqpTestSupport {
public static final String MESSAGE_NUMBER = "MessageNumber";
public static final String KAHADB_DIRECTORY = "target/activemq-data/";
@Rule public TestName name = new TestName();
@ -61,21 +65,18 @@ public class AmqpTestSupport {
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected int numberOfMessages;
protected int port;
protected int sslPort;
protected int nioPort;
protected int nioPlusSslPort;
protected int openwirePort;
public static void main(String[] args) throws Exception {
final AmqpTestSupport s = new AmqpTestSupport();
s.sslPort = 5671;
s.port = 5672;
s.startBroker();
while (true) {
Thread.sleep(100000);
}
}
protected URI amqpURI;
protected int amqpPort;
protected URI amqpSslURI;
protected int amqpSslPort;
protected URI amqpNioURI;
protected int amqpNioPort;
protected URI amqpNioPlusSslURI;
protected int amqpNioPlusSslPort;
protected URI openwireURI;
protected int openwirePort;
@Before
public void setUp() throws Exception {
@ -89,10 +90,17 @@ public class AmqpTestSupport {
protected void createBroker(boolean deleteAllMessages) throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setPersistent(isPersistent());
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
if (isPersistent()) {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File(KAHADB_DIRECTORY + getTestName()));
brokerService.setPersistenceAdapter(kaha);
brokerService.setStoreOpenWireVersion(getstoreOpenWireVersion());
}
brokerService.setSchedulerSupport(false);
brokerService.setAdvisorySupport(false);
brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
brokerService.setUseJmx(true);
brokerService.getManagementContext().setCreateConnector(false);
@ -111,44 +119,64 @@ public class AmqpTestSupport {
sslContext.afterPropertiesSet();
brokerService.setSslContext(sslContext);
addAMQPConnector();
System.setProperty("javax.net.ssl.trustStore", keystore.getCanonicalPath());
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", keystore.getCanonicalPath());
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
addTranportConnectors();
}
protected void addAMQPConnector() throws Exception {
protected void addTranportConnectors() throws Exception {
TransportConnector connector = null;
if (isUseOpenWireConnector()) {
connector = brokerService.addConnector(
"tcp://0.0.0.0:" + openwirePort);
openwirePort = connector.getConnectUri().getPort();
openwireURI = connector.getPublishableConnectURI();
LOG.debug("Using openwire port " + openwirePort);
}
if (isUseTcpConnector()) {
connector = brokerService.addConnector(
"amqp://0.0.0.0:" + port + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
port = connector.getConnectUri().getPort();
LOG.debug("Using amqp port " + port);
"amqp://0.0.0.0:" + amqpPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpPort = connector.getConnectUri().getPort();
amqpURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp port " + amqpPort);
}
if (isUseSslConnector()) {
connector = brokerService.addConnector(
"amqp+ssl://0.0.0.0:" + sslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
sslPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+ssl port " + sslPort);
"amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpSslPort = connector.getConnectUri().getPort();
amqpSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+ssl port " + amqpSslPort);
}
if (isUseNioConnector()) {
connector = brokerService.addConnector(
"amqp+nio://0.0.0.0:" + nioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
nioPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+nio port " + nioPort);
"amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpNioPort = connector.getConnectUri().getPort();
amqpNioURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio port " + amqpNioPort);
}
if (isUseNioPlusSslConnector()) {
connector = brokerService.addConnector(
"amqp+nio+ssl://0.0.0.0:" + nioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
nioPlusSslPort = connector.getConnectUri().getPort();
LOG.debug("Using amqp+nio+ssl port " + nioPlusSslPort);
"amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpNioPlusSslPort = connector.getConnectUri().getPort();
amqpNioPlusSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort);
}
}
protected boolean isPersistent() {
return false;
}
protected int getstoreOpenWireVersion() {
return OpenWireFormat.DEFAULT_VERSION;
}
protected boolean isUseOpenWireConnector() {
return false;
}
@ -188,8 +216,12 @@ public class AmqpTestSupport {
}
public void restartBroker() throws Exception {
restartBroker(false);
}
public void restartBroker(boolean deleteAllOnStartup) throws Exception {
stopBroker();
createBroker(false);
createBroker(deleteAllOnStartup);
brokerService.start();
brokerService.waitUntilStarted();
}

View File

@ -21,10 +21,10 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -35,8 +35,6 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
@ -48,8 +46,8 @@ public class AmqpTransformerTest {
private static final String AMQP_URL = "amqp://0.0.0.0:0%s";
private BrokerService brokerService;
private int amqpPort;
private int openwirePort;
private URI amqpConnectionURI;
private URI openwireConnectionURI;
private static final String TEST_QUEUE = "txqueue";
@Test(timeout = 30 * 1000)
@ -59,10 +57,11 @@ public class AmqpTransformerTest {
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=native"));
// send "text message" with AMQP JMS API
Connection amqpConnection = createAmqpConnection();
QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE);
Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
amqpConnection.start();
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = amqpSession.createQueue(TEST_QUEUE);
MessageProducer p = amqpSession.createProducer(queue);
p.setPriority(7);
@ -75,7 +74,7 @@ public class AmqpTransformerTest {
amqpConnection.close();
// receive with openwire JMS
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI);
Connection openwireConn = factory.createConnection();
openwireConn.start();
Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -105,10 +104,11 @@ public class AmqpTransformerTest {
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=raw"));
// send "text message" with AMQP JMS API
Connection amqpConnection = createAmqpConnection();
QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE);
Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
amqpConnection.start();
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = amqpSession.createQueue(TEST_QUEUE);
MessageProducer p = amqpSession.createProducer(queue);
p.setPriority(7);
@ -121,7 +121,7 @@ public class AmqpTransformerTest {
amqpConnection.close();
// receive with openwire JMS
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI);
Connection openwireConn = factory.createConnection();
openwireConn.start();
Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -156,10 +156,11 @@ public class AmqpTransformerTest {
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms"));
// send "text message" with AMQP JMS API
Connection amqpConnection = createAmqpConnection();
QueueImpl queue = new QueueImpl("queue://" + TEST_QUEUE);
Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
amqpConnection.start();
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = amqpSession.createQueue(TEST_QUEUE);
MessageProducer p = amqpSession.createProducer(queue);
TextMessage amqpMessage = amqpSession.createTextMessage();
@ -171,7 +172,7 @@ public class AmqpTransformerTest {
amqpConnection.close();
// receive with openwire JMS
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://0.0.0.0:" + openwirePort);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI);
Connection openwireConn = factory.createConnection();
openwireConn.start();
Session session = openwireConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -193,19 +194,6 @@ public class AmqpTransformerTest {
openwireConn.close();
}
public Connection createAmqpConnection() throws JMSException {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", amqpPort, "admin", "password");
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
}
public void startBrokerWithAmqpTransport(String amqpUrl) throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
@ -214,9 +202,9 @@ public class AmqpTransformerTest {
brokerService.setDeleteAllMessagesOnStartup(true);
TransportConnector connector = brokerService.addConnector(amqpUrl);
amqpPort = connector.getConnectUri().getPort();
amqpConnectionURI = connector.getPublishableConnectURI();
connector = brokerService.addConnector("tcp://0.0.0.0:0");
openwirePort = connector.getConnectUri().getPort();
openwireConnectionURI = connector.getPublishableConnectURI();
brokerService.start();
brokerService.waitUntilStarted();

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.transport.amqp;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,8 +29,13 @@ public class JMSClientNioPlusSslTest extends JMSClientSslTest {
@Override
protected int getBrokerPort() {
LOG.debug("JMSClientNioPlusSslTest.getBrokerPort returning nioPlusSslPort {}", nioPlusSslPort);
return nioPlusSslPort;
LOG.debug("JMSClientNioPlusSslTest.getBrokerPort returning nioPlusSslPort {}", amqpNioPlusSslPort);
return amqpNioPlusSslPort;
}
@Override
protected URI getBrokerURI() {
return amqpNioPlusSslURI;
}
@Override

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.transport.amqp;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,8 +29,13 @@ public class JMSClientNioTest extends JMSClientTest {
@Override
protected int getBrokerPort() {
LOG.debug("JMSClientNioTest.getBrokerPort returning nioPort {}", nioPort);
return nioPort;
LOG.debug("JMSClientNioTest.getBrokerPort returning nioPort {}", amqpNioPort);
return amqpNioPort;
}
@Override
protected URI getBrokerURI() {
return amqpNioURI;
}
@Override

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
import java.net.URI;
import java.security.SecureRandom;
import javax.jms.Connection;
@ -49,8 +50,13 @@ public class JMSClientSslTest extends JMSClientTest {
@Override
protected int getBrokerPort() {
LOG.debug("JMSClientSslTest.getBrokerPort returning sslPort {}", sslPort);
return sslPort;
LOG.debug("JMSClientSslTest.getBrokerPort returning sslPort {}", amqpSslPort);
return amqpSslPort;
}
@Override
protected URI getBrokerURI() {
return amqpSslURI;
}
@Override

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -26,7 +27,6 @@ import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.junit.After;
@ -84,7 +84,16 @@ public class JMSClientTestSupport extends AmqpTestSupport {
* @return the port to connect to on the Broker.
*/
protected int getBrokerPort() {
return port;
return amqpPort;
}
/**
* Can be overridden in subclasses to test against a different transport suchs as NIO.
*
* @return the URI to connect to on the Broker for AMQP.
*/
protected URI getBrokerURI() {
return amqpURI;
}
protected Connection createConnection() throws JMSException {
@ -106,10 +115,9 @@ public class JMSClientTestSupport extends AmqpTestSupport {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl);
if (useSsl) {
SpringSslContext context = (SpringSslContext) brokerService.getSslContext();
factory.setKeyStorePath(context.getKeyStore());
factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore"));
factory.setKeyStorePassword("password");
factory.setTrustStorePath(context.getTrustStore());
factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore"));
factory.setTrustStorePassword("password");
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.TopicConnection;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Context used for AMQP JMS Clients to create connection instances.
*/
public class JmsClientContext {
private static final Logger LOG = LoggerFactory.getLogger(JmsClientContext.class);
public static final JmsClientContext INSTANCE = new JmsClientContext();
//----- Plain JMS Connection Create methods ------------------------------//
public Connection createConnection(URI remoteURI) throws JMSException {
return createConnection(remoteURI, null, null, true);
}
public Connection createConnection(URI remoteURI, String username, String password) throws JMSException {
return createConnection(remoteURI, username, password, null, true);
}
public Connection createConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
return createConnection(remoteURI, username, password, null, syncPublish);
}
public Connection createConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
return createConnection(remoteURI, username, password, clientId, true);
}
public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
return factory.createConnection();
}
//----- JMS TopicConnection Create methods -------------------------------//
public TopicConnection createTopicConnection(URI remoteURI) throws JMSException {
return createTopicConnection(remoteURI, null, null, true);
}
public TopicConnection createTopicConnection(URI remoteURI, String username, String password) throws JMSException {
return createTopicConnection(remoteURI, username, password, null, true);
}
public TopicConnection createTopicConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
return createTopicConnection(remoteURI, username, password, null, syncPublish);
}
public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
return createTopicConnection(remoteURI, username, password, clientId, true);
}
public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
return factory.createTopicConnection();
}
//----- JMS QueueConnection Create methods -------------------------------//
public QueueConnection createQueueConnection(URI remoteURI) throws JMSException {
return createQueueConnection(remoteURI, null, null, true);
}
public QueueConnection createQueueConnection(URI remoteURI, String username, String password) throws JMSException {
return createQueueConnection(remoteURI, username, password, null, true);
}
public QueueConnection createQueueConnection(URI remoteURI, String username, String password, boolean syncPublish) throws JMSException {
return createQueueConnection(remoteURI, username, password, null, syncPublish);
}
public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId) throws JMSException {
return createQueueConnection(remoteURI, username, password, clientId, true);
}
public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
return factory.createQueueConnection();
}
//------ Internal Implementation bits ------------------------------------//
private ConnectionFactoryImpl createConnectionFactory(
URI remoteURI, String username, String password, String clientId, boolean syncPublish) {
boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
LOG.debug("In createConnectionFactory using port {} ssl? {}", remoteURI.getPort(), useSSL);
ConnectionFactoryImpl factory =
new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, clientId, useSSL);
if (useSSL) {
factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore"));
factory.setKeyStorePassword("password");
factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore"));
factory.setTrustStorePassword("password");
}
factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://");
factory.setSyncPublish(syncPublish);
return factory;
}
}

View File

@ -25,7 +25,6 @@ import java.util.Vector;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -36,7 +35,6 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
@ -157,41 +155,8 @@ public class JmsClientRequestResponseTest extends AmqpTestSupport implements Mes
assertEquals("Should not have had any failures: " + failures, 0, failures.size());
}
/**
* Can be overridden in subclasses to test against a different transport suchs as NIO.
*
* @return the port to connect to on the Broker.
*/
protected int getBrokerPort() {
return port;
}
private Connection createConnection(String clientId) throws JMSException {
return createConnection(clientId, false, false);
}
protected Connection createConnection(String clientId, boolean syncPublish, boolean useSsl) throws JMSException {
int brokerPort = getBrokerPort();
LOG.debug("Creating connection on port {}", brokerPort);
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password", null, useSsl);
factory.setSyncPublish(syncPublish);
factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://");
final Connection connection = factory.createConnection();
if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId);
}
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
return JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", clientId);
}
protected void syncConsumeLoop(MessageConsumer requestConsumer) {

View File

@ -1,4 +1,3 @@
package org.apache.activemq.transport.amqp;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@ -15,6 +14,8 @@ package org.apache.activemq.transport.amqp;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -28,6 +29,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
@ -35,7 +37,6 @@ import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -68,6 +69,9 @@ public class SimpleAMQPAuthTest {
public void testNoUserOrPassword() throws Exception {
try {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "", "");
factory.setQueuePrefix("queue://");
factory.setTopicPrefix("topic://");
Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
@ -96,6 +100,9 @@ public class SimpleAMQPAuthTest {
public void testUnknownUser() throws Exception {
try {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
factory.setQueuePrefix("queue://");
factory.setTopicPrefix("topic://");
Connection connection = factory.createConnection("nosuchuser", "blah");
connection.start();
Thread.sleep(500);
@ -117,6 +124,9 @@ public class SimpleAMQPAuthTest {
public void testKnownUserWrongPassword() throws Exception {
try {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
factory.setQueuePrefix("queue://");
factory.setTopicPrefix("topic://");
Connection connection = factory.createConnection("user", "wrongPassword");
connection.start();
Thread.sleep(500);
@ -137,9 +147,12 @@ public class SimpleAMQPAuthTest {
@Test(timeout = 30000)
public void testSendReceive() throws Exception {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
factory.setQueuePrefix("queue://");
factory.setTopicPrefix("topic://");
Connection connection = factory.createConnection("user", "userPassword");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueImpl queue = new QueueImpl("queue://txqueue");
Queue queue = session.createQueue("txQueue");
MessageProducer p = session.createProducer(queue);
TextMessage message = null;
message = session.createTextMessage();

View File

@ -21,18 +21,16 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.transport.amqp.AmqpTestSupport;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.activemq.transport.amqp.JmsClientContext;
import org.junit.Test;
public class AMQ4753Test extends AmqpTestSupport {
@ -48,14 +46,14 @@ public class AMQ4753Test extends AmqpTestSupport {
}
@Test(timeout = 120 * 1000)
public void testAmqpNioPlusSslSendReceive() throws JMSException{
Connection connection = createAMQPConnection(nioPlusSslPort, true);
public void testAmqpNioPlusSslSendReceive() throws JMSException {
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI);
runSimpleSendReceiveTest(connection);
}
public void runSimpleSendReceiveTest(Connection connection) throws JMSException{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueImpl queue = new QueueImpl("queue://txqueue");
Queue queue =session.createQueue("txqueue");
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage();
String messageText = "hello sent at " + new java.util.Date().toString();
@ -72,27 +70,4 @@ public class AMQ4753Test extends AmqpTestSupport {
assertEquals(messageText, textMessage.getText());
connection.close();
}
private Connection createAMQPConnection(int testPort, boolean useSSL) throws JMSException {
LOG.debug("In createConnection using port {} ssl? {}", testPort, useSSL);
final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", testPort, "admin", "password", null, useSSL);
if (useSSL) {
SpringSslContext sslContext = (SpringSslContext) brokerService.getSslContext();
connectionFactory.setKeyStorePath(sslContext.getKeyStore());
connectionFactory.setKeyStorePassword("password");
connectionFactory.setTrustStorePath(sslContext.getTrustStore());
connectionFactory.setTrustStorePassword("password");
}
final Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
}
}

View File

@ -21,7 +21,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@ -31,7 +30,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.transport.amqp.AmqpTestSupport;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.activemq.transport.amqp.JmsClientContext;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@ -80,8 +79,7 @@ public class AMQ4914Test extends AmqpTestSupport {
String payload = createLargeString(expectedSize);
assertEquals(expectedSize, payload.getBytes().length);
Connection connection = createAMQPConnection(port, false);
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(testName.getMethodName());
@ -108,19 +106,4 @@ public class AMQ4914Test extends AmqpTestSupport {
assertEquals(payload, receivedText);
connection.close();
}
private Connection createAMQPConnection(int testPort, boolean useSSL) throws JMSException {
LOG.debug("In createConnection using port {} ssl? {}", testPort, useSSL);
final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", testPort, "admin", "password", null, useSSL);
connectionFactory.setSyncPublish(true);
final Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
exception.printStackTrace();
}
});
connection.start();
return connection;
}
}

View File

@ -16,18 +16,19 @@
*/
package org.apache.activemq.transport.amqp.bugs;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import org.apache.activemq.transport.amqp.AmqpTestSupport;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.activemq.transport.amqp.JmsClientContext;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class AMQ5256Test extends AmqpTestSupport {
@Override
@ -36,13 +37,23 @@ public class AMQ5256Test extends AmqpTestSupport {
}
@Override
protected boolean isUseNioPlusSslConnector() {
return false;
protected boolean isUseSslConnector() {
return true;
}
@Test(timeout = 40 * 1000)
public void testParallelConnect() throws Exception {
final int numThreads = 80;
@Override
protected boolean isUseNioConnector() {
return true;
}
@Override
protected boolean isUseNioPlusSslConnector() {
return true;
}
@Test(timeout = 60000)
public void testParallelConnectPlain() throws Exception {
final int numThreads = 40;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
executorService.execute(new Runnable() {
@ -50,8 +61,7 @@ public class AMQ5256Test extends AmqpTestSupport {
public void run() {
try {
final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "password", null, isUseSslConnector());
Connection connection = connectionFactory.createConnection();
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
connection.start();
connection.close();
} catch (JMSException e) {
@ -63,6 +73,77 @@ public class AMQ5256Test extends AmqpTestSupport {
executorService.shutdown();
assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
}
@Test(timeout = 60000)
public void testParallelConnectNio() throws Exception {
final int numThreads = 40;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
connection.start();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
}
@Test(timeout = 60000)
public void testParallelConnectSsl() throws Exception {
final int numThreads = 40;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
connection.start();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
}
@Test(timeout = 60000)
public void testParallelConnectNioPlusSsl() throws Exception {
final int numThreads = 40;
ExecutorService executorService = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
connection.start();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS));
}
}

View File

@ -70,16 +70,16 @@ public class UnsupportedClientTest extends AmqpTestSupport {
header.setRevision(1);
// Test TCP
doTestInvalidHeaderProcessing(port, header, false);
doTestInvalidHeaderProcessing(amqpPort, header, false);
// Test SSL
doTestInvalidHeaderProcessing(sslPort, header, true);
doTestInvalidHeaderProcessing(amqpSslPort, header, true);
// Test NIO
doTestInvalidHeaderProcessing(nioPort, header, false);
doTestInvalidHeaderProcessing(amqpNioPort, header, false);
// Test NIO+SSL
doTestInvalidHeaderProcessing(nioPlusSslPort, header, true);
doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
}
@Test(timeout = 60000)
@ -92,16 +92,16 @@ public class UnsupportedClientTest extends AmqpTestSupport {
header.setRevision(0);
// Test TCP
doTestInvalidHeaderProcessing(port, header, false);
doTestInvalidHeaderProcessing(amqpPort, header, false);
// Test SSL
doTestInvalidHeaderProcessing(sslPort, header, true);
doTestInvalidHeaderProcessing(amqpSslPort, header, true);
// Test NIO
doTestInvalidHeaderProcessing(nioPort, header, false);
doTestInvalidHeaderProcessing(amqpNioPort, header, false);
// Test NIO+SSL
doTestInvalidHeaderProcessing(nioPlusSslPort, header, true);
doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
}
@Test(timeout = 60000)
@ -114,16 +114,16 @@ public class UnsupportedClientTest extends AmqpTestSupport {
header.setRevision(0);
// Test TCP
doTestInvalidHeaderProcessing(port, header, false);
doTestInvalidHeaderProcessing(amqpPort, header, false);
// Test SSL
doTestInvalidHeaderProcessing(sslPort, header, true);
doTestInvalidHeaderProcessing(amqpSslPort, header, true);
// Test NIO
doTestInvalidHeaderProcessing(nioPort, header, false);
doTestInvalidHeaderProcessing(amqpNioPort, header, false);
// Test NIO+SSL
doTestInvalidHeaderProcessing(nioPlusSslPort, header, true);
doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
}
@Test(timeout = 60000)
@ -136,16 +136,16 @@ public class UnsupportedClientTest extends AmqpTestSupport {
header.setRevision(1);
// Test TCP
doTestInvalidHeaderProcessing(port, header, false);
doTestInvalidHeaderProcessing(amqpPort, header, false);
// Test SSL
doTestInvalidHeaderProcessing(sslPort, header, true);
doTestInvalidHeaderProcessing(amqpSslPort, header, true);
// Test NIO
doTestInvalidHeaderProcessing(nioPort, header, false);
doTestInvalidHeaderProcessing(amqpNioPort, header, false);
// Test NIO+SSL
doTestInvalidHeaderProcessing(nioPlusSslPort, header, true);
doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
}
@Test(timeout = 60000)
@ -154,16 +154,16 @@ public class UnsupportedClientTest extends AmqpTestSupport {
AmqpHeader header = new AmqpHeader(new Buffer(new byte[]{'S', 'T', 'O', 'M', 'P', 0, 0, 0}), false);
// Test TCP
doTestInvalidHeaderProcessing(port, header, false);
doTestInvalidHeaderProcessing(amqpPort, header, false);
// Test SSL
doTestInvalidHeaderProcessing(sslPort, header, true);
doTestInvalidHeaderProcessing(amqpSslPort, header, true);
// Test NIO
doTestInvalidHeaderProcessing(nioPort, header, false);
doTestInvalidHeaderProcessing(amqpNioPort, header, false);
// Test NIO+SSL
doTestInvalidHeaderProcessing(nioPlusSslPort, header, true);
doTestInvalidHeaderProcessing(amqpNioPlusSslPort, header, true);
}
protected void doTestInvalidHeaderProcessing(int port, final AmqpHeader header, boolean ssl) throws Exception {

View File

@ -20,7 +20,7 @@
#
log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO
log4j.logger.org.apache.activemq.transport.amqp=TRACE
log4j.logger.org.apache.activemq.transport.amqp=DEBUG
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
log4j.logger.org.fusesource=INFO