Switch to the new QPid JMS client v0.1.0
This commit is contained in:
Timothy Bish 2015-03-23 12:15:49 -04:00
parent df3ff9c65e
commit 3051882f92
12 changed files with 130 additions and 162 deletions
activemq-amqp
activemq-karaf-itest
pom.xml
src/test/java/org/apache/activemq/karaf/itest
activemq-unit-tests
pom.xml
src/test/java/org/apache/activemq/conversions
pom.xml

View File

@ -54,7 +54,7 @@
<!-- Testing Dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid-jms-version}</version>
<scope>test</scope>
</dependency>

View File

@ -27,7 +27,7 @@ import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -170,21 +170,21 @@ public class JMSClientContext {
boolean useSSL = remoteURI.getScheme().toLowerCase().contains("ssl");
LOG.debug("In createConnectionFactory using port {} ssl? {}", remoteURI.getPort(), useSSL);
ConnectionFactoryImpl factory =
new ConnectionFactoryImpl(remoteURI.getHost(), remoteURI.getPort(), username, password, null, useSSL);
String amqpURI = (useSSL ? "amqps://" : "amqp://") + remoteURI.getHost() + ":" + remoteURI.getPort();
if (useSSL) {
factory.setKeyStorePath(System.getProperty("javax.net.ssl.trustStore"));
factory.setKeyStorePassword("password");
factory.setTrustStorePath(System.getProperty("javax.net.ssl.keyStore"));
factory.setTrustStorePassword("password");
amqpURI += "?transport.verifyHost=false";
}
LOG.debug("In createConnectionFactory using URI: {}", amqpURI);
JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
factory.setUsername(username);
factory.setPassword(password);
factory.setAlwaysSyncSend(syncPublish);
factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://");
factory.setSyncPublish(syncPublish);
return factory;
}

View File

@ -35,7 +35,6 @@ import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -72,15 +71,6 @@ public class JMSClientSimpleAuthTest {
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
LOG.debug("Failed to authenticate connection with no user / password.");
} catch (JMSException e) {
Exception linkedException = e.getLinkedException();
if (linkedException != null && linkedException instanceof ConnectionClosedException) {
ConnectionClosedException cce = (ConnectionClosedException) linkedException;
assertEquals("Error{condition=unauthorized-access,description=User name [null] or password is invalid.}", cce.getRemoteError().toString());
} else {
LOG.error("Unexpected Exception", e);
fail("Unexpected exception: " + e.getMessage());
}
}
}
@ -92,15 +82,6 @@ public class JMSClientSimpleAuthTest {
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
LOG.debug("Failed to authenticate connection with no user / password.");
} catch (JMSException e) {
Exception linkedException = e.getLinkedException();
if (linkedException != null && linkedException instanceof ConnectionClosedException) {
ConnectionClosedException cce = (ConnectionClosedException) linkedException;
assertEquals("Error{condition=unauthorized-access,description=User name [nosuchuser] or password is invalid.}", cce.getRemoteError().toString());
} else {
LOG.error("Unexpected Exception", e);
fail("Unexpected exception: " + e.getMessage());
}
}
}
@ -112,15 +93,6 @@ public class JMSClientSimpleAuthTest {
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
LOG.debug("Failed to authenticate connection with no user / password.");
} catch (JMSException e) {
Exception linkedException = e.getLinkedException();
if (linkedException != null && linkedException instanceof ConnectionClosedException) {
ConnectionClosedException cce = (ConnectionClosedException) linkedException;
assertEquals("Error{condition=unauthorized-access,description=User name [user] or password is invalid.}", cce.getRemoteError().toString());
} else {
LOG.error("Unexpected Exception", e);
fail("Unexpected exception: " + e.getMessage());
}
}
}
@ -134,15 +106,6 @@ public class JMSClientSimpleAuthTest {
fail("Expected JMSException");
} catch (JMSSecurityException ex) {
LOG.debug("Failed to authenticate connection with no user / password.");
} catch (JMSException e) {
Exception linkedException = e.getLinkedException();
if (linkedException != null && linkedException instanceof ConnectionClosedException) {
ConnectionClosedException cce = (ConnectionClosedException) linkedException;
assertEquals("Error{condition=unauthorized-access,description=User name [user] or password is invalid.}", cce.getRemoteError().toString());
} else {
LOG.error("Unexpected Exception", e);
fail("Unexpected exception: " + e.getMessage());
}
} finally {
if (connection != null) {
connection.close();

View File

@ -1108,6 +1108,7 @@ public class JMSClientTest extends JMSClientTestSupport {
}
}
@Ignore("Legacy QPid client does not support creation of TemporaryTopics correctly")
@Test(timeout=30000)
public void testDeleteTemporaryQueue() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
@ -1135,7 +1136,6 @@ public class JMSClientTest extends JMSClientTestSupport {
}
}
@Ignore("Legacy QPid client does not support creation of TemporaryTopics correctly")
@Test(timeout=30000)
public void testCreateTemporaryTopic() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();

View File

@ -27,7 +27,6 @@ import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import javax.jms.ConnectionFactory;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
@ -35,9 +34,9 @@ import javax.naming.NamingException;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsTopic;
import org.objectweb.jtests.jms.admin.Admin;
/**
@ -52,7 +51,8 @@ public class ActiveMQAdmin implements Admin {
// Use the jetty JNDI context since it's mutable.
final Hashtable<String, String> env = new Hashtable<String, String>();
env.put("java.naming.factory.initial", "org.eclipse.jetty.jndi.InitialContextFactory");
env.put("java.naming.factory.url.pkgs", "org.eclipse.jetty.jndi");;
env.put("java.naming.factory.url.pkgs", "org.eclipse.jetty.jndi");
;
context = new InitialContext(env);
} catch (NamingException e) {
throw new RuntimeException(e);
@ -90,8 +90,7 @@ public class ActiveMQAdmin implements Admin {
}
protected BrokerService createBroker() throws Exception {
return BrokerFactory.createBroker(new URI("broker://()/localhost" +
"?persistent=false&useJmx=false&advisorySupport=false&schedulerSupport=false"));
return BrokerFactory.createBroker(new URI("broker://()/localhost" + "?persistent=false&useJmx=false&advisorySupport=false&schedulerSupport=false"));
}
@Override
@ -104,7 +103,7 @@ public class ActiveMQAdmin implements Admin {
@Override
public void startServer() throws Exception {
if( broker!=null ) {
if (broker != null) {
stopServer();
}
if (System.getProperty("basedir") == null) {
@ -143,7 +142,7 @@ public class ActiveMQAdmin implements Admin {
@Override
public void createQueue(String name) {
try {
context.bind(name, new QueueImpl("queue://"+name));
context.bind(name, new JmsQueue(name));
} catch (NamingException e) {
throw new RuntimeException(e);
}
@ -152,7 +151,7 @@ public class ActiveMQAdmin implements Admin {
@Override
public void createTopic(String name) {
try {
context.bind(name, new TopicImpl("topic://"+name));
context.bind(name, new JmsTopic(name));
} catch (NamingException e) {
throw new RuntimeException(e);
}
@ -160,7 +159,6 @@ public class ActiveMQAdmin implements Admin {
@Override
public void deleteQueue(String name) {
// BrokerTestSupport.delete_queue((Broker)base.broker, name);
try {
context.unbind(name);
} catch (NamingException e) {
@ -180,7 +178,7 @@ public class ActiveMQAdmin implements Admin {
@Override
public void createConnectionFactory(String name) {
try {
final ConnectionFactory factory = new ConnectionFactoryImpl("localhost", port, null, null);
final JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + port);
context.bind(name, factory);
} catch (NamingException e) {
throw new RuntimeException(e);

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp.joram;
import java.io.File;
import java.security.SecureRandom;
import javax.jms.ConnectionFactory;
import javax.naming.NamingException;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
@ -28,7 +27,7 @@ import javax.net.ssl.TrustManager;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.transport.amqp.DefaultTrustManager;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,15 +79,16 @@ public class ActiveMQNIOPlusSSLAdmin extends ActiveMQAdmin {
public void createConnectionFactory(String name) {
try {
LOG.debug("Creating a connection factory using port {}", port);
final ConnectionFactory factory = new ConnectionFactoryImpl("localhost", port, null, null, null, true);
ConnectionFactoryImpl implFactory = (ConnectionFactoryImpl) factory;
final JmsConnectionFactory factory = new JmsConnectionFactory("amqps://localhost:" + port);
SpringSslContext sslContext = (SpringSslContext) broker.getSslContext();
implFactory.setKeyStorePath(sslContext.getKeyStore());
implFactory.setKeyStorePassword("password");
implFactory.setTrustStorePath(sslContext.getTrustStore());
implFactory.setTrustStorePassword("password");
System.setProperty("javax.net.ssl.trustStore", sslContext.getTrustStore());
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", sslContext.getKeyStore());
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
context.bind(name, factory);
} catch (NamingException e) {

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp.joram;
import java.io.File;
import java.security.SecureRandom;
import javax.jms.ConnectionFactory;
import javax.naming.NamingException;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
@ -28,7 +27,7 @@ import javax.net.ssl.TrustManager;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.transport.amqp.DefaultTrustManager;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -80,15 +79,17 @@ public class ActiveMQSSLAdmin extends ActiveMQAdmin {
public void createConnectionFactory(String name) {
try {
LOG.debug("Creating a connection factory using port {}", port);
final ConnectionFactory factory = new ConnectionFactoryImpl("localhost", port, null, null, null, true);
ConnectionFactoryImpl implFactory = (ConnectionFactoryImpl) factory;
final JmsConnectionFactory factory =
new JmsConnectionFactory("amqps://localhost:" + port + "?transport.verifyHost=false");
SpringSslContext sslContext = (SpringSslContext) broker.getSslContext();
implFactory.setKeyStorePath(sslContext.getKeyStore());
implFactory.setKeyStorePassword("password");
implFactory.setTrustStorePath(sslContext.getTrustStore());
implFactory.setTrustStorePassword("password");
System.setProperty("javax.net.ssl.trustStore", sslContext.getTrustStore());
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "jks");
System.setProperty("javax.net.ssl.keyStore", sslContext.getKeyStore());
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
context.bind(name, factory);
} catch (NamingException e) {

View File

@ -103,19 +103,7 @@
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-common</artifactId>
<version>${qpid-jms-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client</artifactId>
<version>${qpid-jms-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid-jms-version}</version>
<scope>test</scope>
</dependency>
@ -186,39 +174,39 @@
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>
org.apache.servicemix.tooling
</groupId>
<artifactId>
depends-maven-plugin
</artifactId>
<versionRange>[1.2,)</versionRange>
<goals>
<goal>
generate-depends-file
</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore />
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>
org.apache.servicemix.tooling
</groupId>
<artifactId>
depends-maven-plugin
</artifactId>
<versionRange>[1.2,)</versionRange>
<goals>
<goal>
generate-depends-file
</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore />
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>

View File

@ -16,17 +16,18 @@
*/
package org.apache.activemq.karaf.itest;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import javax.jms.Connection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.CoreOptions;
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.junit.PaxExam;
import javax.jms.Connection;
@Ignore
@RunWith(PaxExam.class)
public class ActiveMQAMQPBrokerFeatureTest extends ActiveMQBrokerFeatureTest {
private static final Integer AMQP_PORT = 61636;
@ -34,14 +35,9 @@ public class ActiveMQAMQPBrokerFeatureTest extends ActiveMQBrokerFeatureTest {
@Configuration
public static Option[] configure() {
Option[] activeMQOptions = configure("activemq");
final String fragmentHost = "qpid-amqp-jms-client";
Option qpidClient = CoreOptions.wrappedBundle(CoreOptions.mavenBundle("org.apache.qpid", "qpid-amqp-1-0-client").versionAsInProject().getURL().toString() + "$Bundle-SymbolicName=qpid-amqp-client&Fragment-Host=" + fragmentHost);
Option qpidClientJms = CoreOptions.wrappedBundle(CoreOptions.mavenBundle("org.apache.qpid", "qpid-amqp-1-0-client-jms").versionAsInProject().getURL().toString() + "$Bundle-SymbolicName=" + fragmentHost);
Option qpidCommon = CoreOptions.wrappedBundle(CoreOptions.mavenBundle("org.apache.qpid", "qpid-amqp-1-0-common").versionAsInProject().getURL().toString());
Option qpidClient = CoreOptions.wrappedBundle(CoreOptions.mavenBundle("org.apache.qpid", "qpid-jms-client").versionAsInProject().getURL().toString() + "$Bundle-SymbolicName=qpid-jms-client");
Option[] options = append(qpidClient, activeMQOptions);
options = append(qpidClientJms, options);
options = append(qpidCommon, options);
Option[] configuredOptions = configureBrokerStart(options);
return configuredOptions;
@ -50,7 +46,12 @@ public class ActiveMQAMQPBrokerFeatureTest extends ActiveMQBrokerFeatureTest {
@Override
protected Connection getConnection() throws Throwable {
ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", AMQP_PORT, AbstractFeatureTest.USER, AbstractFeatureTest.PASSWORD);
String amqpURI = "amqp://localhost" + AMQP_PORT;
JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
factory.setUsername(AbstractFeatureTest.USER);
factory.setPassword(AbstractFeatureTest.PASSWORD);
Connection connection = null;
ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();
try {
@ -64,6 +65,7 @@ public class ActiveMQAMQPBrokerFeatureTest extends ActiveMQBrokerFeatureTest {
return connection;
}
@Override
@Ignore
@Test(timeout = 5 * 60 * 1000)
public void testTemporaryDestinations() throws Throwable {

View File

@ -157,7 +157,7 @@
<!-- =============================== -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid-jms-version}</version>
<scope>test</scope>
</dependency>

View File

@ -16,55 +16,66 @@
*/
package org.apache.activemq.conversions;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
/**
*/
public class AmqpAndMqttTest extends CombinationTestSupport {
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AmqpAndMqttTest {
protected BrokerService broker;
private TransportConnector amqpConnector;
private TransportConnector mqttConnector;
@Override
protected void setUp() throws Exception {
super.setUp();
@Before
public void setUp() throws Exception {
broker = createBroker();
broker.start();
broker.waitUntilStarted();
}
@Override
protected void tearDown() throws Exception {
if( broker!=null ) {
@After
public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
broker = null;
}
super.tearDown();
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.setAdvisorySupport(false);
broker.setSchedulerSupport(false);
amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
return broker;
}
@Test(timeout = 60000)
public void testFromMqttToAmqp() throws Exception {
Connection amqp = createAmqpConnection();
Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -95,7 +106,6 @@ public class AmqpAndMqttTest extends CombinationTestSupport {
}
}
protected MQTT createMQTTConnection() throws Exception {
MQTT mqtt = new MQTT();
mqtt.setConnectAttemptsMax(1);
@ -105,7 +115,14 @@ public class AmqpAndMqttTest extends CombinationTestSupport {
}
public Connection createAmqpConnection() throws Exception {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", amqpConnector.getConnectUri().getPort(), "admin", "password");
String amqpURI = "amqp://localhost:" + amqpConnector.getConnectUri().getPort();
final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
factory.setUsername("admin");
factory.setPassword("password");
final Connection connection = factory.createConnection();
connection.setExceptionListener(new ExceptionListener() {
@Override
@ -116,5 +133,4 @@ public class AmqpAndMqttTest extends CombinationTestSupport {
connection.start();
return connection;
}
}

View File

@ -105,7 +105,7 @@
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.8</qpid-proton-version>
<qpid-jms-version>0.32-SNAPSHOT</qpid-jms-version>
<qpid-jms-version>0.1.0</qpid-jms-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>
<saxon-version>9.5.1-2</saxon-version>