diff --git a/artemis-cli/pom.xml b/artemis-cli/pom.xml
index 7870c599ad..88a48db478 100644
--- a/artemis-cli/pom.xml
+++ b/artemis-cli/pom.xml
@@ -94,6 +94,12 @@
org.apache.geronimo.specs
geronimo-json_1.0_spec
+
+
+ org.apache.qpid
+ qpid-jms-client
+ ${qpid.jms.version}
+
org.jboss.logging
jboss-logging-annotations
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
index 3619ed70db..37f08c35f1 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
@@ -30,8 +30,8 @@ public abstract class AbstractAction extends ConnectionAbstract {
public void performCoreManagement(ManagementCallback cb) throws Exception {
- try (ActiveMQConnectionFactory factory = createConnectionFactory();
- ServerLocator locator = factory.getServerLocator();
+ try (ActiveMQConnectionFactory factory = createCoreConnectionFactory();
+ ServerLocator locator = factory.getServerLocator();
ClientSessionFactory sessionFactory = locator.createSessionFactory();
ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
session.start();
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
index 9562b59a16..e249cbf077 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Browse.java
@@ -18,14 +18,13 @@
package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@Command(name = "browser", description = "It will browse messages on an instance")
public class Browse extends DestAbstract {
@@ -39,9 +38,8 @@ public class Browse extends DestAbstract {
System.out.println("Consumer:: filter = " + filter);
- ActiveMQConnectionFactory factory = createConnectionFactory();
+ ConnectionFactory factory = createConnectionFactory();
- Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
try (Connection connection = factory.createConnection()) {
ConsumerThread[] threadsArray = new ConsumerThread[threads];
for (int i = 0; i < threads; i++) {
@@ -51,6 +49,7 @@ public class Browse extends DestAbstract {
} else {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
+ Destination dest = lookupDestination(session);
threadsArray[i] = new ConsumerThread(session, dest, i);
threadsArray[i].setVerbose(verbose).setSleep(sleep).setMessageCount(messageCount).setFilter(filter).setBrowse(true);
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
index 41443c4d6c..90882e6c0b 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java
@@ -18,12 +18,14 @@
package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.InputAbstract;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.qpid.jms.JmsConnectionFactory;
public class ConnectionAbstract extends InputAbstract {
@@ -39,7 +41,48 @@ public class ConnectionAbstract extends InputAbstract {
@Option(name = "--clientID", description = "ClientID to be associated with connection")
String clientID;
- protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+ @Option(name = "--protocol", description = "Protocol used. Valid values are amqp or core. Default=core.")
+ String protocol = "core";
+
+ protected ConnectionFactory createConnectionFactory() throws Exception {
+ if (protocol.equals("core")) {
+ return createCoreConnectionFactory();
+ } else if (protocol.equals("amqp")) {
+ return createAMQPConnectionFactory();
+ } else {
+ throw new IllegalStateException("protocol " + protocol + " not supported");
+ }
+ }
+
+ private ConnectionFactory createAMQPConnectionFactory() {
+ if (brokerURL.startsWith("tcp://")) {
+ // replacing tcp:// by amqp://
+ brokerURL = "amqp" + brokerURL.substring(3);
+ }
+ JmsConnectionFactory cf = new JmsConnectionFactory(user, password, brokerURL);
+ if (clientID != null) {
+ cf.setClientID(clientID);
+ }
+
+ try {
+ Connection connection = cf.createConnection();
+ connection.close();
+ return cf;
+ } catch (JMSSecurityException e) {
+ // if a security exception will get the user and password through an input
+ context.err.println("Connection failed::" + e.getMessage());
+ userPassword();
+ return new JmsConnectionFactory(user, password, brokerURL);
+ } catch (JMSException e) {
+ // if a connection exception will ask for the URL, user and password
+ context.err.println("Connection failed::" + e.getMessage());
+ brokerURL = input("--url", "Type in the broker URL for a retry (e.g. tcp://localhost:61616)", brokerURL);
+ userPassword();
+ return new JmsConnectionFactory(user, password, brokerURL);
+ }
+ }
+
+ protected ActiveMQConnectionFactory createCoreConnectionFactory() {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
if (clientID != null) {
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
index c58f79221c..ee15a66a90 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java
@@ -18,14 +18,13 @@
package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@Command(name = "consumer", description = "It will consume messages from an instance")
public class Consumer extends DestAbstract {
@@ -48,9 +47,8 @@ public class Consumer extends DestAbstract {
System.out.println("Consumer:: filter = " + filter);
- ActiveMQConnectionFactory factory = createConnectionFactory();
+ ConnectionFactory factory = createConnectionFactory();
- Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
try (Connection connection = factory.createConnection()) {
ConsumerThread[] threadsArray = new ConsumerThread[threads];
for (int i = 0; i < threads; i++) {
@@ -60,6 +58,7 @@ public class Consumer extends DestAbstract {
} else {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
+ Destination dest = lookupDestination(session);
threadsArray[i] = new ConsumerThread(session, dest, i);
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false);
@@ -82,4 +81,5 @@ public class Consumer extends DestAbstract {
}
}
+
}
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
index 7883e5844d..ecffa349f1 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java
@@ -146,12 +146,16 @@ public class ConsumerThread extends Thread {
consumer = session.createConsumer(destination);
}
}
+ int count = 0;
while (running && received < messageCount) {
Message msg = consumer.receive(receiveTimeOut);
if (msg != null) {
- System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
if (verbose) {
- System.out.println("..." + msg);
+ System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
+ } else {
+ if (++count % 1000 == 0) {
+ System.out.println("Received " + count);
+ }
}
if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
index acf0473b64..2f4a34c72e 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java
@@ -17,7 +17,11 @@
package org.apache.activemq.artemis.cli.commands.messages;
+import javax.jms.Destination;
+import javax.jms.Session;
+
import io.airlift.airline.Option;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
public class DestAbstract extends ConnectionAbstract {
@@ -36,4 +40,12 @@ public class DestAbstract extends ConnectionAbstract {
@Option(name = "--threads", description = "Number of Threads to be used (Default: 1)")
int threads = 1;
+ protected Destination lookupDestination(Session session) throws Exception {
+ if (protocol.equals("AMQP")) {
+ return session.createQueue(destination);
+ } else {
+ return ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
+ }
+ }
+
}
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
index 3ed4b57f25..e077fb0adb 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java
@@ -18,14 +18,13 @@
package org.apache.activemq.artemis.cli.commands.messages;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@Command(name = "producer", description = "It will send messages to an instance")
public class Producer extends DestAbstract {
@@ -49,9 +48,8 @@ public class Producer extends DestAbstract {
public Object execute(ActionContext context) throws Exception {
super.execute(context);
- ActiveMQConnectionFactory factory = createConnectionFactory();
+ ConnectionFactory factory = createConnectionFactory();
- Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
try (Connection connection = factory.createConnection()) {
ProducerThread[] threadsArray = new ProducerThread[threads];
for (int i = 0; i < threads; i++) {
@@ -61,6 +59,7 @@ public class Producer extends DestAbstract {
} else {
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
+ Destination dest = lookupDestination(session);
threadsArray[i] = new ProducerThread(session, dest, i);
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml
index 84116a4479..656543050a 100644
--- a/artemis-distribution/pom.xml
+++ b/artemis-distribution/pom.xml
@@ -170,7 +170,15 @@
tomcat-servlet-api
-
+
+
+ org.apache.qpid
+ qpid-jms-client
+ ${qpid.jms.version}
+
+
+
+
org.apache.activemq
artemis-console
diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml
index f80971395c..aad4ab1c3d 100644
--- a/artemis-distribution/src/main/assembly/dep.xml
+++ b/artemis-distribution/src/main/assembly/dep.xml
@@ -69,6 +69,7 @@
org.apache.activemq:artemis-service-extensions
org.apache.activemq:artemis-web
org.apache.activemq.rest:artemis-rest
+ org.apache.qpid:qpid-jms-client
org.apache.geronimo.specs:geronimo-jms_2.0_spec