mirror of https://github.com/apache/activemq.git
Consolidate remaining dirct JMS client type usages to the context. Rename some tests so be consistent.
This commit is contained in:
parent
8f0bf6060a
commit
240278dbef
|
@ -76,7 +76,7 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
assertTrue(brokerService.isPersistent());
|
||||
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(name.getMethodName());
|
||||
|
@ -122,7 +122,7 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
assertTrue(brokerService.isPersistent());
|
||||
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(name.getMethodName());
|
||||
|
@ -152,7 +152,7 @@ public class AMQ4563Test extends AmqpTestSupport {
|
|||
}
|
||||
|
||||
private int readAllMessages(String queueName, String selector) throws JMSException {
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
|
||||
connection.start();
|
||||
|
||||
try {
|
||||
|
|
|
@ -57,7 +57,7 @@ public class AmqpTransformerTest {
|
|||
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=native"));
|
||||
|
||||
// send "text message" with AMQP JMS API
|
||||
Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
|
||||
Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
|
||||
amqpConnection.start();
|
||||
|
||||
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -104,7 +104,7 @@ public class AmqpTransformerTest {
|
|||
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=raw"));
|
||||
|
||||
// send "text message" with AMQP JMS API
|
||||
Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
|
||||
Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
|
||||
amqpConnection.start();
|
||||
|
||||
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -156,7 +156,7 @@ public class AmqpTransformerTest {
|
|||
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms"));
|
||||
|
||||
// send "text message" with AMQP JMS API
|
||||
Connection amqpConnection = JmsClientContext.INSTANCE.createConnection(amqpConnectionURI);
|
||||
Connection amqpConnection = JMSClientContext.INSTANCE.createConnection(amqpConnectionURI);
|
||||
amqpConnection.start();
|
||||
|
||||
Session amqpSession = amqpConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.transport.amqp;
|
|||
import java.net.URI;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.QueueConnection;
|
||||
import javax.jms.TopicConnection;
|
||||
|
@ -30,11 +31,11 @@ import org.slf4j.LoggerFactory;
|
|||
/**
|
||||
* Context used for AMQP JMS Clients to create connection instances.
|
||||
*/
|
||||
public class JmsClientContext {
|
||||
public class JMSClientContext {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JmsClientContext.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JMSClientContext.class);
|
||||
|
||||
public static final JmsClientContext INSTANCE = new JmsClientContext();
|
||||
public static final JMSClientContext INSTANCE = new JMSClientContext();
|
||||
|
||||
//----- Plain JMS Connection Create methods ------------------------------//
|
||||
|
||||
|
@ -57,7 +58,16 @@ public class JmsClientContext {
|
|||
public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
|
||||
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
|
||||
|
||||
return factory.createConnection();
|
||||
Connection connection = factory.createConnection();
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
LOG.error("Unexpected exception ", exception);
|
||||
exception.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
//----- JMS TopicConnection Create methods -------------------------------//
|
||||
|
@ -81,7 +91,16 @@ public class JmsClientContext {
|
|||
public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
|
||||
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
|
||||
|
||||
return factory.createTopicConnection();
|
||||
TopicConnection connection = factory.createTopicConnection();
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
LOG.error("Unexpected exception ", exception);
|
||||
exception.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
//----- JMS QueueConnection Create methods -------------------------------//
|
||||
|
@ -105,7 +124,16 @@ public class JmsClientContext {
|
|||
public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
|
||||
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
|
||||
|
||||
return factory.createQueueConnection();
|
||||
QueueConnection connection = factory.createQueueConnection();
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
LOG.error("Unexpected exception ", exception);
|
||||
exception.printStackTrace();
|
||||
}
|
||||
});
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
//------ Internal Implementation bits ------------------------------------//
|
|
@ -42,9 +42,9 @@ import org.junit.rules.TestName;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class JmsClientRequestResponseTest extends AmqpTestSupport implements MessageListener {
|
||||
public class JMSClientRequestResponseTest extends AmqpTestSupport implements MessageListener {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JmsClientRequestResponseTest.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JMSClientRequestResponseTest.class);
|
||||
|
||||
@Rule public TestName name = new TestName();
|
||||
|
||||
|
@ -156,7 +156,7 @@ public class JmsClientRequestResponseTest extends AmqpTestSupport implements Mes
|
|||
}
|
||||
|
||||
private Connection createConnection(String clientId) throws JMSException {
|
||||
return JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", clientId);
|
||||
return JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "password", clientId);
|
||||
}
|
||||
|
||||
protected void syncConsumeLoop(MessageConsumer requestConsumer) {
|
|
@ -24,7 +24,6 @@ import static org.junit.Assert.fail;
|
|||
import java.net.URI;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
|
@ -36,21 +35,20 @@ import javax.jms.TextMessage;
|
|||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
|
||||
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class SimpleAMQPAuthTest {
|
||||
public class JMSClientSimpleAuthTest {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(SimpleAMQPAuthTest.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(JMSClientSimpleAuthTest.class);
|
||||
|
||||
private final String SIMPLE_AUTH_AMQP_BROKER_XML =
|
||||
"org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml";
|
||||
private BrokerService brokerService;
|
||||
private int port;
|
||||
private URI amqpURI;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
@ -68,18 +66,7 @@ public class SimpleAMQPAuthTest {
|
|||
@Test(timeout = 10000)
|
||||
public void testNoUserOrPassword() throws Exception {
|
||||
try {
|
||||
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "", "");
|
||||
factory.setQueuePrefix("queue://");
|
||||
factory.setTopicPrefix("topic://");
|
||||
|
||||
Connection connection = factory.createConnection();
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
@Override
|
||||
public void onException(JMSException exception) {
|
||||
LOG.error("Unexpected exception ", exception);
|
||||
exception.printStackTrace();
|
||||
}
|
||||
});
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "", "");
|
||||
connection.start();
|
||||
Thread.sleep(500);
|
||||
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -99,11 +86,7 @@ public class SimpleAMQPAuthTest {
|
|||
@Test(timeout = 10000)
|
||||
public void testUnknownUser() throws Exception {
|
||||
try {
|
||||
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
|
||||
factory.setQueuePrefix("queue://");
|
||||
factory.setTopicPrefix("topic://");
|
||||
|
||||
Connection connection = factory.createConnection("nosuchuser", "blah");
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "nosuchuser", "blah");
|
||||
connection.start();
|
||||
Thread.sleep(500);
|
||||
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -123,11 +106,7 @@ public class SimpleAMQPAuthTest {
|
|||
@Test(timeout = 10000)
|
||||
public void testKnownUserWrongPassword() throws Exception {
|
||||
try {
|
||||
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
|
||||
factory.setQueuePrefix("queue://");
|
||||
factory.setTopicPrefix("topic://");
|
||||
|
||||
Connection connection = factory.createConnection("user", "wrongPassword");
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "wrongPassword");
|
||||
connection.start();
|
||||
Thread.sleep(500);
|
||||
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -146,11 +125,7 @@ public class SimpleAMQPAuthTest {
|
|||
|
||||
@Test(timeout = 30000)
|
||||
public void testSendReceive() throws Exception {
|
||||
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
|
||||
factory.setQueuePrefix("queue://");
|
||||
factory.setTopicPrefix("topic://");
|
||||
|
||||
Connection connection = factory.createConnection("user", "userPassword");
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue("txQueue");
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
|
@ -183,7 +158,7 @@ public class SimpleAMQPAuthTest {
|
|||
public void startBroker() throws Exception {
|
||||
brokerService = createBroker();
|
||||
brokerService.start();
|
||||
port = brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI().getPort();
|
||||
amqpURI = brokerService.getTransportConnectorByName("amqp").getPublishableConnectURI();
|
||||
brokerService.waitUntilStarted();
|
||||
}
|
||||
}
|
|
@ -896,7 +896,7 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
|
||||
|
||||
TopicConnection subscriberConnection =
|
||||
JmsClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
|
||||
JMSClientContext.INSTANCE.createTopicConnection(amqpURI, "admin", "password");
|
||||
subscriberConnection.setClientID(durableClientId);
|
||||
TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = subscriberSession.createTopic(getDestinationName());
|
||||
|
|
|
@ -101,7 +101,7 @@ public class JMSConcurrentConsumersTest extends AmqpTestSupport {
|
|||
|
||||
public void doTestSendWithMultipleConsumers(URI remoteURI) throws Exception {
|
||||
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(remoteURI, "admin", "password", false);
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(remoteURI, "admin", "password", false);
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
String destinationName = "AMQ4920Test" + System.currentTimeMillis();
|
||||
Destination destination = session.createTopic(destinationName);
|
||||
|
@ -170,7 +170,7 @@ public class JMSConcurrentConsumersTest extends AmqpTestSupport {
|
|||
LOG.debug(consumerName + " starting");
|
||||
Connection connection = null;
|
||||
try {
|
||||
connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
|
||||
connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "admin", false);
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = session.createTopic(destinationName);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
|
|
|
@ -77,7 +77,7 @@ public class JMSLargeMessageSendRecvTest extends AmqpTestSupport {
|
|||
String payload = createLargeString(expectedSize);
|
||||
assertEquals(expectedSize, payload.getBytes().length);
|
||||
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI);
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI);
|
||||
long startTime = System.currentTimeMillis();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(testName.getMethodName());
|
||||
|
|
|
@ -32,9 +32,9 @@ import org.junit.Test;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class JmsMessageGroupsTest extends JMSClientTestSupport {
|
||||
public class JMSMessageGroupsTest extends JMSClientTestSupport {
|
||||
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageGroupsTest.class);
|
||||
protected static final Logger LOG = LoggerFactory.getLogger(JMSMessageGroupsTest.class);
|
||||
|
||||
private static final int ITERATIONS = 10;
|
||||
private static final int MESSAGE_COUNT = 10;
|
|
@ -59,7 +59,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
|
|||
public void run() {
|
||||
|
||||
try {
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "admin", "password");
|
||||
connection.start();
|
||||
connection.close();
|
||||
} catch (JMSException e) {
|
||||
|
@ -83,7 +83,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
|
|||
public void run() {
|
||||
|
||||
try {
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpNioURI, "admin", "password");
|
||||
connection.start();
|
||||
connection.close();
|
||||
} catch (JMSException e) {
|
||||
|
@ -107,7 +107,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
|
|||
public void run() {
|
||||
|
||||
try {
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpSslURI, "admin", "password");
|
||||
connection.start();
|
||||
connection.close();
|
||||
} catch (JMSException e) {
|
||||
|
@ -131,7 +131,7 @@ public class JMSParallelConnectTest extends AmqpTestSupport {
|
|||
public void run() {
|
||||
|
||||
try {
|
||||
Connection connection = JmsClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
|
||||
Connection connection = JMSClientContext.INSTANCE.createConnection(amqpNioPlusSslURI, "admin", "password");
|
||||
connection.start();
|
||||
connection.close();
|
||||
} catch (JMSException e) {
|
||||
|
|
Loading…
Reference in New Issue