ARTEMIS-2645 refactor CLI FQQN support
FQQN support for the CLI was implemented via ARTEMIS-1840 before general FQQN support was added for producers via ARTEMIS-1867. The CLI's FQQN functionality is slightly different from what is now generally available and it can be confusing for users. By refactoring the CLI to use the general FQQN support the code can be much simpler and consistent with the expected behavior. Refactoring includes: - Deprecating the use of "fqqn://". The CLI commands use JMS so using "fqqn://" (instead of "queue://" or "topic://") makes the destination type ambiguous which can yield unexpected message routing behavior. Now "queue://" and "topic://" can be used with the normal FQQN syntax (e.g. address::queue). - Eliminating the use of the _AMQ_ROUTE_TO header when sending messags to an FQQN. The _AMQ_ROUTE_TO header is an internal header used when routing messages over a cluster bridge. Using it in the CLI for FQQN support was a clever hack, but using the general FQQN support eliminates complexity and makes behavior consistent between standalone JMS clients using FQQN and the CLI. - De-duplicating MessageSerializer initialization boilerplate. - Removing limitation where using an FQQN with an anycast address required the same name for the address and queue.
This commit is contained in:
parent
8927d07fb7
commit
7ad53e5748
|
@ -49,10 +49,15 @@ public class Browse extends DestAbstract {
|
|||
} else {
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
Destination dest = lookupDestination(session);
|
||||
Destination dest = getDestination(session);
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||
|
||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setMessageCount(messageCount).setFilter(filter).setBrowse(true);
|
||||
threadsArray[i]
|
||||
.setVerbose(verbose)
|
||||
.setSleep(sleep)
|
||||
.setMessageCount(messageCount)
|
||||
.setFilter(filter)
|
||||
.setBrowse(true);
|
||||
}
|
||||
|
||||
for (ConsumerThread thread : threadsArray) {
|
||||
|
@ -69,11 +74,6 @@ public class Browse extends DestAbstract {
|
|||
}
|
||||
|
||||
return received;
|
||||
} finally {
|
||||
if (factory instanceof AutoCloseable) {
|
||||
((AutoCloseable) factory).close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -56,30 +56,25 @@ public class Consumer extends DestAbstract {
|
|||
System.out.println("Consumer:: filter = " + filter);
|
||||
|
||||
SerialiserMessageListener listener = null;
|
||||
MessageSerializer messageSerializer = null;
|
||||
MessageSerializer serializer = null;
|
||||
if (file != null) {
|
||||
try {
|
||||
String className = serializer == null ? DEFAULT_MESSAGE_SERIALIZER : serializer;
|
||||
if (className.equals(DEFAULT_MESSAGE_SERIALIZER) && !protocol.equalsIgnoreCase("CORE")) {
|
||||
System.err.println("Default Serializer does not support: " + protocol + " protocol");
|
||||
return null;
|
||||
}
|
||||
messageSerializer = (MessageSerializer) Class.forName(className).getConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error. Unable to instantiate serializer class: " + serializer);
|
||||
serializer = getMessageSerializer();
|
||||
if (serializer == null) {
|
||||
System.err.println("Error. Unable to instantiate serializer class: " + this.serializer);
|
||||
return null;
|
||||
}
|
||||
|
||||
OutputStream out;
|
||||
try {
|
||||
OutputStream out = new FileOutputStream(file);
|
||||
listener = new SerialiserMessageListener(messageSerializer, out);
|
||||
out = new FileOutputStream(file);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error: Unable to open file for writing\n" + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
if (messageSerializer != null) messageSerializer.start();
|
||||
listener = new SerialiserMessageListener(serializer, out);
|
||||
serializer.start();
|
||||
}
|
||||
|
||||
ConnectionFactory factory = createConnectionFactory();
|
||||
|
||||
|
@ -94,12 +89,20 @@ public class Consumer extends DestAbstract {
|
|||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
|
||||
// Do validation on FQQN
|
||||
Destination dest = isFQQN() ? session.createQueue(getFQQNFromDestination(destination)) : lookupDestination(session);
|
||||
Destination dest = getDestination(session);
|
||||
threadsArray[i] = new ConsumerThread(session, dest, i);
|
||||
|
||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setDurable(durable).setBatchSize(txBatchSize).setBreakOnNull(breakOnNull)
|
||||
.setMessageCount(messageCount).setReceiveTimeOut(receiveTimeout).setFilter(filter).setBrowse(false).setListener(listener);
|
||||
threadsArray[i]
|
||||
.setVerbose(verbose)
|
||||
.setSleep(sleep)
|
||||
.setDurable(durable)
|
||||
.setBatchSize(txBatchSize)
|
||||
.setBreakOnNull(breakOnNull)
|
||||
.setMessageCount(messageCount)
|
||||
.setReceiveTimeOut(receiveTimeout)
|
||||
.setFilter(filter)
|
||||
.setBrowse(false)
|
||||
.setListener(listener);
|
||||
}
|
||||
|
||||
for (ConsumerThread thread : threadsArray) {
|
||||
|
@ -115,13 +118,11 @@ public class Consumer extends DestAbstract {
|
|||
received += thread.getReceived();
|
||||
}
|
||||
|
||||
if (messageSerializer != null) messageSerializer.stop();
|
||||
if (serializer != null) {
|
||||
serializer.stop();
|
||||
}
|
||||
|
||||
return received;
|
||||
} finally {
|
||||
if (factory instanceof AutoCloseable) {
|
||||
((AutoCloseable) factory).close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,33 +18,17 @@
|
|||
package org.apache.activemq.artemis.cli.commands.messages;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientRequestor;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.api.core.management.ResourceNames;
|
||||
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
|
||||
import org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
|
||||
public class DestAbstract extends ConnectionAbstract {
|
||||
|
||||
public static final String DEFAULT_MESSAGE_SERIALIZER = "org.apache.activemq.artemis.cli.factory.serialize.XMLMessageSerializer";
|
||||
|
||||
private static final String FQQN_PREFIX = "fqqn://";
|
||||
|
||||
private static final String FQQN_SEPERATOR = "::";
|
||||
|
||||
@Option(name = "--destination", description = "Destination to be used. It can be prefixed with queue:// or topic:// or fqqn:// (Default: queue://TEST)")
|
||||
@Option(name = "--destination", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
|
||||
String destination = "queue://TEST";
|
||||
|
||||
@Option(name = "--message-count", description = "Number of messages to act on (Default: 1000)")
|
||||
|
@ -62,86 +46,37 @@ public class DestAbstract extends ConnectionAbstract {
|
|||
@Option(name = "--serializer", description = "Override the default serializer with a custom implementation")
|
||||
String serializer;
|
||||
|
||||
protected boolean isFQQN() throws ActiveMQException {
|
||||
boolean fqqn = destination.contains("::");
|
||||
if (fqqn) {
|
||||
if (!destination.startsWith("fqqn://")) {
|
||||
throw new ActiveMQException("FQQN destinations must start with the fqqn:// prefix");
|
||||
}
|
||||
|
||||
if (protocol.equalsIgnoreCase("AMQP")) {
|
||||
throw new ActiveMQException("Sending to FQQN destinations is not support via AMQP protocol");
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
protected Destination lookupDestination(Session session) throws Exception {
|
||||
if (protocol.equals("AMQP")) {
|
||||
return session.createQueue(destination);
|
||||
} else {
|
||||
return ActiveMQDestination.createDestination(this.destination, ActiveMQDestination.TYPE.QUEUE);
|
||||
}
|
||||
}
|
||||
|
||||
protected MessageSerializer getMessageSerializer() {
|
||||
if (serializer == null) return new XMLMessageSerializer();
|
||||
try {
|
||||
return (MessageSerializer) Class.forName(serializer).getConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
System.out.println("Error: unable to instantiate serializer class: " + serializer);
|
||||
System.out.println("Defaulting to: " + DEFAULT_MESSAGE_SERIALIZER);
|
||||
if (serializer != null) {
|
||||
try {
|
||||
return (MessageSerializer) Class.forName(serializer).getConstructor().newInstance();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error: unable to instantiate serializer class: " + serializer);
|
||||
System.err.println("Defaulting to: " + XMLMessageSerializer.class.getName());
|
||||
}
|
||||
}
|
||||
|
||||
if (!protocol.equalsIgnoreCase("CORE")) {
|
||||
System.err.println("Default Serializer does not support: " + protocol + " protocol");
|
||||
return null;
|
||||
}
|
||||
|
||||
return new XMLMessageSerializer();
|
||||
}
|
||||
|
||||
public byte[] getQueueIdFromName(String queueName) throws Exception {
|
||||
try {
|
||||
ClientMessage message = getQueueAttribute(queueName, "ID");
|
||||
Number idObject = (Number) ManagementHelper.getResult(message);
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
|
||||
byteBuffer.putLong(idObject.longValue());
|
||||
return byteBuffer.array();
|
||||
} catch (Exception e) {
|
||||
throw new ActiveMQException("Error occured when looking up FQQN. Please ensure the FQQN exists.", e, ActiveMQExceptionType.ILLEGAL_STATE);
|
||||
protected Destination getDestination(Session session) throws JMSException {
|
||||
if (destination.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
|
||||
return session.createTopic(stripPrefix(destination));
|
||||
}
|
||||
return session.createQueue(stripPrefix(destination));
|
||||
}
|
||||
|
||||
protected ClientMessage getQueueAttribute(String queueName, String attribute) throws Exception {
|
||||
try (ServerLocator serverLocator = ActiveMQClient.createServerLocator(brokerURL)) {
|
||||
try (ClientSessionFactory sf = serverLocator.createSessionFactory()) {
|
||||
ClientSession managementSession;
|
||||
if (user != null || password != null) {
|
||||
managementSession = sf.createSession(user, password, false, true, true, false, 0);
|
||||
} else {
|
||||
managementSession = sf.createSession(false, true, true);
|
||||
}
|
||||
managementSession.start();
|
||||
|
||||
try (ClientRequestor requestor = new ClientRequestor(managementSession, "activemq.management")) {
|
||||
ClientMessage managementMessage = managementSession.createMessage(false);
|
||||
ManagementHelper.putAttribute(managementMessage, ResourceNames.QUEUE + queueName, attribute);
|
||||
managementSession.start();
|
||||
ClientMessage reply = requestor.request(managementMessage);
|
||||
return reply;
|
||||
} finally {
|
||||
managementSession.stop();
|
||||
}
|
||||
}
|
||||
private String stripPrefix(String destination) {
|
||||
int index = destination.indexOf("://");
|
||||
if (index != -1) {
|
||||
return destination.substring(index + 3);
|
||||
} else {
|
||||
return destination;
|
||||
}
|
||||
}
|
||||
|
||||
protected String getQueueFromFQQN(String fqqn) {
|
||||
return fqqn.substring(fqqn.indexOf(FQQN_SEPERATOR) + FQQN_SEPERATOR.length());
|
||||
}
|
||||
|
||||
protected String getAddressFromFQQN(String fqqn) {
|
||||
return fqqn.substring(fqqn.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length(), fqqn.indexOf(FQQN_SEPERATOR));
|
||||
}
|
||||
|
||||
protected String getFQQNFromDestination(String destination) {
|
||||
return destination.substring(destination.indexOf(FQQN_PREFIX) + FQQN_PREFIX.length());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,15 +25,12 @@ import javax.jms.Message;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.InputStream;
|
||||
|
||||
import io.airlift.airline.Command;
|
||||
import io.airlift.airline.Option;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage;
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
|
||||
import org.apache.activemq.artemis.cli.commands.ActionContext;
|
||||
import org.apache.activemq.artemis.cli.factory.serialize.MessageSerializer;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||
|
||||
@Command(name = "producer", description = "It will send messages to an instance")
|
||||
public class Producer extends DestAbstract {
|
||||
|
@ -72,16 +69,10 @@ public class Producer extends DestAbstract {
|
|||
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
|
||||
byte[] queueId = null;
|
||||
boolean isFQQN = isFQQN();
|
||||
if (isFQQN) {
|
||||
queueId = getQueueIdFromName(getQueueFromFQQN(destination));
|
||||
}
|
||||
|
||||
// If we are reading from file, we process messages sequentially to guarantee ordering. i.e. no thread creation.
|
||||
if (fileName != null) {
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
Destination dest = lookupDestination(session, isFQQN);
|
||||
Destination dest = getDestination(session);
|
||||
|
||||
MessageProducer producer = session.createProducer(dest);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
|
@ -89,13 +80,25 @@ public class Producer extends DestAbstract {
|
|||
int messageCount = 0;
|
||||
try {
|
||||
MessageSerializer serializer = getMessageSerializer();
|
||||
serializer.setInput(new FileInputStream(fileName), session);
|
||||
if (serializer == null) {
|
||||
System.err.println("Error. Unable to instantiate serializer class: " + serializer);
|
||||
return null;
|
||||
}
|
||||
|
||||
InputStream in;
|
||||
try {
|
||||
in = new FileInputStream(fileName);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error: Unable to open file for reading\n" + e.getMessage());
|
||||
return null;
|
||||
}
|
||||
|
||||
serializer.setInput(in, session);
|
||||
serializer.start();
|
||||
|
||||
Message message = serializer.read();
|
||||
|
||||
while (message != null) {
|
||||
if (queueId != null) ((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId);
|
||||
producer.send(message);
|
||||
message = serializer.read();
|
||||
messageCount++;
|
||||
|
@ -120,13 +123,21 @@ public class Producer extends DestAbstract {
|
|||
} else {
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
Destination dest = lookupDestination(session, isFQQN);
|
||||
Destination dest = getDestination(session);
|
||||
threadsArray[i] = new ProducerThread(session, dest, i);
|
||||
|
||||
threadsArray[i].setVerbose(verbose).setSleep(sleep).setPersistent(!nonpersistent).
|
||||
setMessageSize(messageSize).setTextMessageSize(textMessageSize).setMessage(message).setObjectSize(objectSize).
|
||||
setMsgTTL(msgTTL).setMsgGroupID(msgGroupID).setTransactionBatchSize(txBatchSize).
|
||||
setMessageCount(messageCount).setQueueId(queueId);
|
||||
threadsArray[i]
|
||||
.setVerbose(verbose)
|
||||
.setSleep(sleep)
|
||||
.setPersistent(!nonpersistent)
|
||||
.setMessageSize(messageSize)
|
||||
.setTextMessageSize(textMessageSize)
|
||||
.setMessage(message)
|
||||
.setObjectSize(objectSize)
|
||||
.setMsgTTL(msgTTL)
|
||||
.setMsgGroupID(msgGroupID)
|
||||
.setTransactionBatchSize(txBatchSize)
|
||||
.setMessageCount(messageCount);
|
||||
}
|
||||
|
||||
for (ProducerThread thread : threadsArray) {
|
||||
|
@ -140,35 +151,6 @@ public class Producer extends DestAbstract {
|
|||
}
|
||||
return messagesProduced;
|
||||
}
|
||||
} finally {
|
||||
if (factory instanceof AutoCloseable) {
|
||||
((AutoCloseable) factory).close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Destination lookupDestination(Session session, boolean isFQQN) throws Exception {
|
||||
Destination dest;
|
||||
if (!isFQQN) {
|
||||
dest = lookupDestination(session);
|
||||
} else {
|
||||
String address = getAddressFromFQQN(destination);
|
||||
if (isFQQNAnycast(getQueueFromFQQN(destination))) {
|
||||
String queue = getQueueFromFQQN(destination);
|
||||
if (!queue.equals(address)) {
|
||||
throw new ActiveMQException("FQQN support is limited to Anycast queues where the queue name equals the address.");
|
||||
}
|
||||
dest = session.createQueue(address);
|
||||
} else {
|
||||
dest = session.createTopic(address);
|
||||
}
|
||||
}
|
||||
return dest;
|
||||
}
|
||||
|
||||
protected boolean isFQQNAnycast(String queueName) throws Exception {
|
||||
ClientMessage message = getQueueAttribute(queueName, "RoutingType");
|
||||
String routingType = (String) ManagementHelper.getResult(message);
|
||||
return routingType.equalsIgnoreCase("anycast");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.io.InputStreamReader;
|
|||
import java.net.URL;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||
|
||||
public class ProducerThread extends Thread {
|
||||
|
@ -49,7 +48,6 @@ public class ProducerThread extends Thread {
|
|||
long msgTTL = 0L;
|
||||
String msgGroupID = null;
|
||||
int transactionBatchSize;
|
||||
byte[] queueId = null;
|
||||
|
||||
int transactions = 0;
|
||||
final AtomicInteger sentCount = new AtomicInteger(0);
|
||||
|
@ -124,10 +122,6 @@ public class ProducerThread extends Thread {
|
|||
private void sendMessage(MessageProducer producer, String threadName) throws Exception {
|
||||
Message message = createMessage(sentCount.get(), threadName);
|
||||
|
||||
if (queueId != null) {
|
||||
((ActiveMQMessage) message).getCoreMessage().putBytesProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTE_TO_IDS, queueId);
|
||||
}
|
||||
|
||||
producer.send(message);
|
||||
if (verbose) {
|
||||
System.out.println(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
|
||||
|
@ -377,8 +371,4 @@ public class ProducerThread extends Thread {
|
|||
this.objectSize = objectSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void setQueueId(byte[] queueId) {
|
||||
this.queueId = queueId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.cli.test;
|
|||
|
||||
import org.apache.activemq.artemis.cli.Artemis;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -73,10 +74,8 @@ public class CliProducerTest extends CliTestBase {
|
|||
|
||||
private void checkSentMessages(Session session, String address, String messageBody) throws Exception {
|
||||
final boolean isCustomMessageBody = messageBody != null;
|
||||
boolean fqqn = false;
|
||||
if (address.contains("::")) fqqn = true;
|
||||
|
||||
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, fqqn);
|
||||
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
|
||||
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
|
||||
if (!isCustomMessageBody) messageBody = "test message: " + String.valueOf(i);
|
||||
assertEquals(messageBody, ((TextMessage) received.get(i)).getText());
|
||||
|
|
|
@ -27,17 +27,18 @@ import javax.jms.Session;
|
|||
import javax.jms.TextMessage;
|
||||
import javax.jms.Topic;
|
||||
import javax.jms.TopicSubscriber;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.RoutingType;
|
||||
import org.apache.activemq.artemis.cli.Artemis;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.artemis.utils.CompositeAddress;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -100,10 +101,7 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
}
|
||||
|
||||
private void checkSentMessages(Session session, List<Message> messages, String address) throws Exception {
|
||||
boolean fqqn = false;
|
||||
if (address.contains("::")) fqqn = true;
|
||||
|
||||
List<Message> recieved = consumeMessages(session, address, TEST_MESSAGE_COUNT, fqqn);
|
||||
List<Message> recieved = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
|
||||
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
|
||||
assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText());
|
||||
}
|
||||
|
@ -128,26 +126,18 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
}
|
||||
|
||||
private void exportMessages(String address, int noMessages, boolean durable, String clientId, File output) throws Exception {
|
||||
List<String> args = new ArrayList<>(Arrays.asList("consumer",
|
||||
"--user", "admin",
|
||||
"--password", "admin",
|
||||
"--destination", address,
|
||||
"--message-count", Integer.toString(noMessages),
|
||||
"--data", output.getAbsolutePath(),
|
||||
"--clientID", clientId));
|
||||
if (durable) {
|
||||
String[] args = {"consumer",
|
||||
"--user", "admin",
|
||||
"--password", "admin",
|
||||
"--destination", address,
|
||||
"--message-count", Integer.toString(noMessages),
|
||||
"--data", output.getAbsolutePath(),
|
||||
"--clientID", clientId,
|
||||
"--durable"};
|
||||
Artemis.main(args);
|
||||
} else {
|
||||
String[] args = {"consumer",
|
||||
"--user", "admin",
|
||||
"--password", "admin",
|
||||
"--destination", address,
|
||||
"--message-count", Integer.toString(noMessages),
|
||||
"--data", output.getAbsolutePath(),
|
||||
"--clientID", clientId};
|
||||
Artemis.main(args);
|
||||
args.add("--durable");
|
||||
}
|
||||
|
||||
Artemis.main(args.toArray(new String[0]));
|
||||
}
|
||||
|
||||
private void importMessages(String address, File input) throws Exception {
|
||||
|
@ -246,22 +236,31 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSendDirectToQueue() throws Exception {
|
||||
public void testSendDirectToMulticastQueue() throws Exception {
|
||||
internalTestSendDirectToQueue(RoutingType.MULTICAST);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendDirectToAnycastQueue() throws Exception {
|
||||
internalTestSendDirectToQueue(RoutingType.ANYCAST);
|
||||
}
|
||||
|
||||
private void internalTestSendDirectToQueue(RoutingType routingType) throws Exception {
|
||||
|
||||
String address = "test";
|
||||
String queue1Name = "queue1";
|
||||
String queue2Name = "queue2";
|
||||
|
||||
createQueue("--multicast", address, queue1Name);
|
||||
createQueue("--multicast", address, queue2Name);
|
||||
createQueue("--" + routingType.toString().toLowerCase(), address, queue1Name);
|
||||
createQueue("--" + routingType.toString().toLowerCase(), address, queue2Name);
|
||||
|
||||
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) {
|
||||
|
||||
// send messages to queue
|
||||
Session session = createSession(connection);
|
||||
|
||||
Destination queue1 = session.createQueue(address + "::" + queue1Name);
|
||||
Destination queue2 = session.createQueue(address + "::" + queue2Name);
|
||||
Destination queue1 = session.createQueue(CompositeAddress.toFullyQualified(address, queue1Name));
|
||||
Destination queue2 = session.createQueue(CompositeAddress.toFullyQualified(address, queue2Name));
|
||||
|
||||
MessageConsumer consumer1 = session.createConsumer(queue1);
|
||||
MessageConsumer consumer2 = session.createConsumer(queue2);
|
||||
|
@ -269,7 +268,7 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
Artemis.main("producer",
|
||||
"--user", "admin",
|
||||
"--password", "admin",
|
||||
"--destination", "fqqn://" + address + "::" + queue1Name,
|
||||
"--destination", (routingType == RoutingType.ANYCAST ? ActiveMQDestination.QUEUE_QUALIFIED_PREFIX : ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) + CompositeAddress.toFullyQualified(address, queue1Name),
|
||||
"--message-count", "5");
|
||||
|
||||
assertNull(consumer2.receive(1000));
|
||||
|
@ -281,8 +280,8 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
public void exportFromFQQN() throws Exception {
|
||||
String addr = "address";
|
||||
String queue = "queue";
|
||||
String fqqn = addr + "::" + queue;
|
||||
String destination = "fqqn://" + fqqn;
|
||||
String fqqn = CompositeAddress.toFullyQualified(addr, queue);
|
||||
String destination = ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + fqqn;
|
||||
|
||||
File file = createMessageFile();
|
||||
|
||||
|
@ -294,7 +293,7 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
|
||||
List<Message> messages = generateTextMessages(session, topic);
|
||||
|
||||
exportMessages(destination, file);
|
||||
exportMessages(fqqn, file);
|
||||
importMessages(destination, file);
|
||||
|
||||
checkSentMessages(session, messages, fqqn);
|
||||
|
@ -317,7 +316,7 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
List<Message> messages = generateTextMessages(session, aAddress);
|
||||
|
||||
exportMessages(aAddress, file);
|
||||
importMessages("topic://" + mAddress, file);
|
||||
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + mAddress, file);
|
||||
|
||||
checkSentMessages(session, messages, queueM1Name);
|
||||
checkSentMessages(session, messages, queueM2Name);
|
||||
|
@ -329,8 +328,8 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
String aAddress = "testAnycast";
|
||||
String queueM1Name = "queueM1";
|
||||
String queueM2Name = "queueM2";
|
||||
String fqqnMulticast1 = mAddress + "::" + queueM1Name;
|
||||
String fqqnMulticast2 = mAddress + "::" + queueM2Name;
|
||||
String fqqnMulticast1 = CompositeAddress.toFullyQualified(mAddress, queueM1Name);
|
||||
String fqqnMulticast2 = CompositeAddress.toFullyQualified(mAddress, queueM2Name);
|
||||
|
||||
File file = createMessageFile();
|
||||
|
||||
|
@ -342,7 +341,7 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
List<Message> messages = generateTextMessages(session, aAddress);
|
||||
|
||||
exportMessages(aAddress, file);
|
||||
importMessages("fqqn://" + fqqnMulticast1, file);
|
||||
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + fqqnMulticast1, file);
|
||||
|
||||
checkSentMessages(session, messages, fqqnMulticast1);
|
||||
|
||||
|
@ -359,13 +358,13 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
createBothTypeAddress(address);
|
||||
|
||||
exportMessages("topic://" + address, 0, true, clientId, file);
|
||||
exportMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address, 0, true, clientId, file);
|
||||
|
||||
connection.start();
|
||||
|
||||
List<Message> messages = generateTextMessages(session, getTopicDestination(address));
|
||||
|
||||
exportMessages("topic://" + address, TEST_MESSAGE_COUNT, true, clientId, file);
|
||||
exportMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address, TEST_MESSAGE_COUNT, true, clientId, file);
|
||||
|
||||
importMessages(address, file);
|
||||
|
||||
|
@ -390,32 +389,11 @@ public class MessageSerializerTest extends CliTestBase {
|
|||
|
||||
exportMessages(address, file);
|
||||
|
||||
importMessages("topic://" + address, file);
|
||||
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + address, file);
|
||||
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
|
||||
TextMessage messageReceived = (TextMessage) subscriber.receive(1000);
|
||||
assertNotNull(messageReceived);
|
||||
assertEquals(((TextMessage) messages.get(i)).getText(), messageReceived.getText());
|
||||
}
|
||||
}
|
||||
|
||||
//read individual lines from byteStream
|
||||
private ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput) throws IOException {
|
||||
byte[] bytes;
|
||||
|
||||
if (errorOutput) {
|
||||
bytes = context.getStdErrBytes();
|
||||
} else {
|
||||
bytes = context.getStdoutBytes();
|
||||
}
|
||||
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes)));
|
||||
ArrayList<String> lines = new ArrayList<>();
|
||||
|
||||
String currentLine = bufferedReader.readLine();
|
||||
while (currentLine != null) {
|
||||
lines.add(currentLine);
|
||||
currentLine = bufferedReader.readLine();
|
||||
}
|
||||
|
||||
return lines;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue