mirror of https://github.com/apache/activemq.git
Fix tests to always set an client ID for the durable subscription cases, the legacy client doesn't seem to care but the new one will throw an exception if not set.
This commit is contained in:
parent
709b64b3d2
commit
adef03e5a4
|
@ -19,10 +19,13 @@ package org.apache.activemq.transport.amqp;
|
|||
import java.net.URI;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.ExceptionListener;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.QueueConnection;
|
||||
import javax.jms.QueueConnectionFactory;
|
||||
import javax.jms.TopicConnection;
|
||||
import javax.jms.TopicConnectionFactory;
|
||||
|
||||
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -56,7 +59,7 @@ public class JMSClientContext {
|
|||
}
|
||||
|
||||
public Connection createConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
|
||||
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
|
||||
ConnectionFactory factory = createConnectionFactory(remoteURI, username, password, syncPublish);
|
||||
|
||||
Connection connection = factory.createConnection();
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
|
@ -67,6 +70,10 @@ public class JMSClientContext {
|
|||
}
|
||||
});
|
||||
|
||||
if (clientId != null && !clientId.isEmpty()) {
|
||||
connection.setClientID(clientId);
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
@ -89,7 +96,7 @@ public class JMSClientContext {
|
|||
}
|
||||
|
||||
public TopicConnection createTopicConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
|
||||
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
|
||||
TopicConnectionFactory factory = createTopicConnectionFactory(remoteURI, username, password, syncPublish);
|
||||
|
||||
TopicConnection connection = factory.createTopicConnection();
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
|
@ -100,6 +107,10 @@ public class JMSClientContext {
|
|||
}
|
||||
});
|
||||
|
||||
if (clientId != null && !clientId.isEmpty()) {
|
||||
connection.setClientID(clientId);
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
@ -122,7 +133,7 @@ public class JMSClientContext {
|
|||
}
|
||||
|
||||
public QueueConnection createQueueConnection(URI remoteURI, String username, String password, String clientId, boolean syncPublish) throws JMSException {
|
||||
ConnectionFactoryImpl factory = createConnectionFactory(remoteURI, username, password, clientId, syncPublish);
|
||||
QueueConnectionFactory factory = createQueueConnectionFactory(remoteURI, username, password, syncPublish);
|
||||
|
||||
QueueConnection connection = factory.createQueueConnection();
|
||||
connection.setExceptionListener(new ExceptionListener() {
|
||||
|
@ -133,20 +144,36 @@ public class JMSClientContext {
|
|||
}
|
||||
});
|
||||
|
||||
if (clientId != null && !clientId.isEmpty()) {
|
||||
connection.setClientID(clientId);
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
//------ Internal Implementation bits ------------------------------------//
|
||||
|
||||
private ConnectionFactoryImpl createConnectionFactory(
|
||||
URI remoteURI, String username, String password, String clientId, boolean syncPublish) {
|
||||
private QueueConnectionFactory createQueueConnectionFactory(
|
||||
URI remoteURI, String username, String password, boolean syncPublish) {
|
||||
|
||||
return (QueueConnectionFactory) createConnectionFactory(remoteURI, username, password, syncPublish);
|
||||
}
|
||||
|
||||
private TopicConnectionFactory createTopicConnectionFactory(
|
||||
URI remoteURI, String username, String password, boolean syncPublish) {
|
||||
|
||||
return (TopicConnectionFactory) createConnectionFactory(remoteURI, username, password, syncPublish);
|
||||
}
|
||||
|
||||
private ConnectionFactory createConnectionFactory(
|
||||
URI remoteURI, String username, String password, boolean syncPublish) {
|
||||
|
||||
boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
|
||||
|
||||
LOG.debug("In createConnectionFactory using port {} ssl? {}", remoteURI.getPort(), useSSL);
|
||||
|
||||
ConnectionFactoryImpl factory =
|
||||
new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, clientId, useSSL);
|
||||
new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, null, useSSL);
|
||||
|
||||
if (useSSL) {
|
||||
factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore"));
|
||||
|
|
|
@ -600,8 +600,9 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicReference<Message> received = new AtomicReference<Message>();
|
||||
String durableClientId = getDestinationName() + "-ClientId";
|
||||
|
||||
connection = createConnection();
|
||||
connection = createConnection(durableClientId);
|
||||
{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic(getDestinationName());
|
||||
|
@ -632,8 +633,9 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
@Test(timeout=30000)
|
||||
public void testDurableConsumerSync() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
String durableClientId = getDestinationName() + "-ClientId";
|
||||
|
||||
connection = createConnection();
|
||||
connection = createConnection(durableClientId);
|
||||
{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic(getDestinationName());
|
||||
|
@ -923,9 +925,11 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
public void testDurableConsumerUnsubscribe() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
||||
String durableClientId = getDestinationName() + "-ClientId";
|
||||
|
||||
final BrokerViewMBean broker = getProxyToBroker();
|
||||
|
||||
connection = createConnection();
|
||||
connection = createConnection(durableClientId);
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -993,10 +997,11 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
@Test(timeout=30000)
|
||||
public void testDurableConsumerUnsubscribeWhileActive() throws Exception {
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
String durableClientId = getDestinationName() + "-ClientId";
|
||||
|
||||
final BrokerViewMBean broker = getProxyToBroker();
|
||||
|
||||
connection = createConnection();
|
||||
connection = createConnection(durableClientId);
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
|
Loading…
Reference in New Issue