ARTEMIS-1777 Adding Protocol specific into producer / consumer
This commit is contained in:
parent
b6a29a5b5f
commit
c2955af164
|
@ -94,6 +94,12 @@
|
|||
<groupId>org.apache.geronimo.specs</groupId>
|
||||
<artifactId>geronimo-json_1.0_spec</artifactId>
|
||||
</dependency>
|
||||
<!-- artemis producer and consumer can use amqp as the protocol -->
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>qpid-jms-client</artifactId>
|
||||
<version>${qpid.jms.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.jboss.logging</groupId>
|
||||
<artifactId>jboss-logging-annotations</artifactId>
|
||||
|
|
|
@ -30,7 +30,7 @@ public abstract class AbstractAction extends ConnectionAbstract {
|
|||
|
||||
public void performCoreManagement(ManagementCallback<ClientMessage> cb) throws Exception {
|
||||
|
||||
try (ActiveMQConnectionFactory factory = createConnectionFactory();
|
||||
try (ActiveMQConnectionFactory factory = createCoreConnectionFactory();
|
||||
ServerLocator locator = factory.getServerLocator();
|
||||
ClientSessionFactory sessionFactory = locator.createSessionFactory();
|
||||
ClientSession session = sessionFactory.createSession(user, password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
package org.apache.activemq.artemis.cli.commands.messages;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Session;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
|
||||
@Command(name = "browser", description = "It will browse messages on an instance")
|
||||
public class Browse extends DestAbstract {
|
||||
|
@ -39,9 +38,8 @@ public class Browse extends DestAbstract {
|
|||
|
||||
System.out.println("Consumer:: filter = " + filter);
|
||||
|
||||
ActiveMQConnectionFactory factory = createConnectionFactory();
|
||||
ConnectionFactory factory = createConnectionFactory();
|
||||
|
||||
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
ConsumerThread[] threadsArray = new ConsumerThread[threads];
|
||||
for (int i = 0; i < threads; i++) {
|
||||
|
@ -51,6 +49,7 @@ public class Browse extends DestAbstract {
|
|||
} else {
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
Destination dest = lookupDestination(session);
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||
|
||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setMessageCount(messageCount).setFilter(filter).setBrowse(true);
|
||||
|
|
|
@ -18,12 +18,14 @@
|
|||
package org.apache.activemq.artemis.cli.commands.messages;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.JMSSecurityException;
|
||||
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.cli.commands.InputAbstract;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
|
||||
public class ConnectionAbstract extends InputAbstract {
|
||||
|
||||
|
@ -39,7 +41,48 @@ public class ConnectionAbstract extends InputAbstract {
|
|||
@Option(name = "--clientID", description = "ClientID to be associated with connection")
|
||||
String clientID;
|
||||
|
||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||
@Option(name = "--protocol", description = "Protocol used. Valid values are amqp or core. Default=core.")
|
||||
String protocol = "core";
|
||||
|
||||
protected ConnectionFactory createConnectionFactory() throws Exception {
|
||||
if (protocol.equals("core")) {
|
||||
return createCoreConnectionFactory();
|
||||
} else if (protocol.equals("amqp")) {
|
||||
return createAMQPConnectionFactory();
|
||||
} else {
|
||||
throw new IllegalStateException("protocol " + protocol + " not supported");
|
||||
}
|
||||
}
|
||||
|
||||
private ConnectionFactory createAMQPConnectionFactory() {
|
||||
if (brokerURL.startsWith("tcp://")) {
|
||||
// replacing tcp:// by amqp://
|
||||
brokerURL = "amqp" + brokerURL.substring(3);
|
||||
}
|
||||
JmsConnectionFactory cf = new JmsConnectionFactory(user, password, brokerURL);
|
||||
if (clientID != null) {
|
||||
cf.setClientID(clientID);
|
||||
}
|
||||
|
||||
try {
|
||||
Connection connection = cf.createConnection();
|
||||
connection.close();
|
||||
return cf;
|
||||
} catch (JMSSecurityException e) {
|
||||
// if a security exception will get the user and password through an input
|
||||
context.err.println("Connection failed::" + e.getMessage());
|
||||
userPassword();
|
||||
return new JmsConnectionFactory(user, password, brokerURL);
|
||||
} catch (JMSException e) {
|
||||
// if a connection exception will ask for the URL, user and password
|
||||
context.err.println("Connection failed::" + e.getMessage());
|
||||
brokerURL = input("--url", "Type in the broker URL for a retry (e.g. tcp://localhost:61616)", brokerURL);
|
||||
userPassword();
|
||||
return new JmsConnectionFactory(user, password, brokerURL);
|
||||
}
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory createCoreConnectionFactory() {
|
||||
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
|
||||
|
||||
if (clientID != null) {
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
package org.apache.activemq.artemis.cli.commands.messages;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Session;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
|
||||
@Command(name = "consumer", description = "It will consume messages from an instance")
|
||||
public class Consumer extends DestAbstract {
|
||||
|
@ -48,9 +47,8 @@ public class Consumer extends DestAbstract {
|
|||
|
||||
System.out.println("Consumer:: filter = " + filter);
|
||||
|
||||
ActiveMQConnectionFactory factory = createConnectionFactory();
|
||||
ConnectionFactory factory = createConnectionFactory();
|
||||
|
||||
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
ConsumerThread[] threadsArray = new ConsumerThread[threads];
|
||||
for (int i = 0; i < threads; i++) {
|
||||
|
@ -60,6 +58,7 @@ public class Consumer extends DestAbstract {
|
|||
} else {
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
Destination dest = lookupDestination(session);
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||
|
||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull).setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false);
|
||||
|
@ -82,4 +81,5 @@ public class Consumer extends DestAbstract {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -146,12 +146,16 @@ public class ConsumerThread extends Thread {
|
|||
consumer = session.createConsumer(destination);
|
||||
}
|
||||
}
|
||||
int count = 0;
|
||||
while (running && received < messageCount) {
|
||||
Message msg = consumer.receive(receiveTimeOut);
|
||||
if (msg != null) {
|
||||
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||
if (verbose) {
|
||||
System.out.println("..." + msg);
|
||||
System.out.println(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
|
||||
} else {
|
||||
if (++count % 1000 == 0) {
|
||||
System.out.println("Received " + count);
|
||||
}
|
||||
}
|
||||
if (bytesAsText && (msg instanceof BytesMessage)) {
|
||||
long length = ((BytesMessage) msg).getBodyLength();
|
||||
|
|
|
@ -17,7 +17,11 @@
|
|||
|
||||
package org.apache.activemq.artemis.cli.commands.messages;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Session;
|
||||
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
|
||||
public class DestAbstract extends ConnectionAbstract {
|
||||
|
||||
|
@ -36,4 +40,12 @@ public class DestAbstract extends ConnectionAbstract {
|
|||
@Option(name = "--threads", description = "Number of Threads to be used (Default: 1)")
|
||||
int threads = 1;
|
||||
|
||||
protected Destination lookupDestination(Session session) throws Exception {
|
||||
if (protocol.equals("AMQP")) {
|
||||
return session.createQueue(destination);
|
||||
} else {
|
||||
return ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -18,14 +18,13 @@
|
|||
package org.apache.activemq.artemis.cli.commands.messages;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.Session;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
|
||||
@Command(name = "producer", description = "It will send messages to an instance")
|
||||
public class Producer extends DestAbstract {
|
||||
|
@ -49,9 +48,8 @@ public class Producer extends DestAbstract {
|
|||
public Object execute(ActionContext context) throws Exception {
|
||||
super.execute(context);
|
||||
|
||||
ActiveMQConnectionFactory factory = createConnectionFactory();
|
||||
ConnectionFactory factory = createConnectionFactory();
|
||||
|
||||
Destination dest = ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
ProducerThread[] threadsArray = new ProducerThread[threads];
|
||||
for (int i = 0; i < threads; i++) {
|
||||
|
@ -61,6 +59,7 @@ public class Producer extends DestAbstract {
|
|||
} else {
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
Destination dest = lookupDestination(session);
|
||||
threadsArray[i] = new ProducerThread(session, dest, i);
|
||||
|
||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
|
||||
|
|
|
@ -170,6 +170,14 @@
|
|||
<artifactId>tomcat-servlet-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- for artemis cli producer/consumer -->
|
||||
<dependency>
|
||||
<groupId>org.apache.qpid</groupId>
|
||||
<artifactId>qpid-jms-client</artifactId>
|
||||
<version>${qpid.jms.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<!-- Management Console Dependencies -->
|
||||
<dependency>
|
||||
<groupId>org.apache.activemq</groupId>
|
||||
|
|
|
@ -69,6 +69,7 @@
|
|||
<include>org.apache.activemq:artemis-service-extensions</include>
|
||||
<include>org.apache.activemq:artemis-web</include>
|
||||
<include>org.apache.activemq.rest:artemis-rest</include>
|
||||
<include>org.apache.qpid:qpid-jms-client</include>
|
||||
|
||||
<!-- dependencies -->
|
||||
<include>org.apache.geronimo.specs:geronimo-jms_2.0_spec</include>
|
||||
|
|
Loading…
Reference in New Issue