diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddressAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddressAbstract.java index 3eceb034eb..7b352627cc 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddressAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/address/AddressAbstract.java @@ -38,8 +38,9 @@ public abstract class AddressAbstract extends AbstractAction { private Boolean noMulticast; - public void setName(String name) { + public AbstractAction setName(String name) { this.name = name; + return this; } public String getName(boolean requireInput) { diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java index c170259e64..1d252c3a19 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConnectionAbstract.java @@ -48,24 +48,27 @@ public class ConnectionAbstract extends InputAbstract { return user; } - public void setUser(String user) { + public ConnectionAbstract setUser(String user) { this.user = user; + return this; } public String getPassword() { return password; } - public void setPassword(String password) { + public ConnectionAbstract setPassword(String password) { this.password = password; + return this; } public String getClientID() { return clientID; } - public void setClientID(String clientID) { + public ConnectionAbstract setClientID(String clientID) { this.clientID = clientID; + return this; } public String getProtocol() { diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java index 71eac78019..498d850bec 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java @@ -140,4 +140,49 @@ public class Consumer extends DestAbstract { messageSerializer.write(message); } } + + public boolean isDurable() { + return durable; + } + + public Consumer setDurable(boolean durable) { + this.durable = durable; + return this; + } + + public boolean isBreakOnNull() { + return breakOnNull; + } + + public Consumer setBreakOnNull(boolean breakOnNull) { + this.breakOnNull = breakOnNull; + return this; + } + + public int getReceiveTimeout() { + return receiveTimeout; + } + + public Consumer setReceiveTimeout(int receiveTimeout) { + this.receiveTimeout = receiveTimeout; + return this; + } + + public String getFilter() { + return filter; + } + + public Consumer setFilter(String filter) { + this.filter = filter; + return this; + } + + public String getFile() { + return file; + } + + public Consumer setFile(String file) { + this.file = file; + return this; + } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java index 0611ec5d46..360bac6dd2 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/DestAbstract.java @@ -79,4 +79,58 @@ public class DestAbstract extends ConnectionAbstract { return destination; } } + + public String getDestination() { + return destination; + } + + public DestAbstract setDestination(String destination) { + this.destination = destination; + return this; + } + + public int getMessageCount() { + return messageCount; + } + + public DestAbstract setMessageCount(int messageCount) { + this.messageCount = messageCount; + return this; + } + + public int getSleep() { + return sleep; + } + + public DestAbstract setSleep(int sleep) { + this.sleep = sleep; + return this; + } + + public int getTxBatchSize() { + return txBatchSize; + } + + public DestAbstract setTxBatchSize(int txBatchSize) { + this.txBatchSize = txBatchSize; + return this; + } + + public int getThreads() { + return threads; + } + + public DestAbstract setThreads(int threads) { + this.threads = threads; + return this; + } + + public String getSerializer() { + return serializer; + } + + public DestAbstract setSerializer(String serializer) { + this.serializer = serializer; + return this; + } } diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java index 442017c173..6d060fe552 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Producer.java @@ -59,7 +59,79 @@ public class Producer extends DestAbstract { String msgGroupID = null; @Option(name = "--data", description = "Messages will be read form the specified file, other message options will be ignored.") - String fileName = null; + String file = null; + + public boolean isNonpersistent() { + return nonpersistent; + } + + public Producer setNonpersistent(boolean nonpersistent) { + this.nonpersistent = nonpersistent; + return this; + } + + public int getMessageSize() { + return messageSize; + } + + public Producer setMessageSize(int messageSize) { + this.messageSize = messageSize; + return this; + } + + public String getMessage() { + return message; + } + + public Producer setMessage(String message) { + this.message = message; + return this; + } + + public int getTextMessageSize() { + return textMessageSize; + } + + public Producer setTextMessageSize(int textMessageSize) { + this.textMessageSize = textMessageSize; + return this; + } + + public int getObjectSize() { + return objectSize; + } + + public Producer setObjectSize(int objectSize) { + this.objectSize = objectSize; + return this; + } + + public long getMsgTTL() { + return msgTTL; + } + + public Producer setMsgTTL(long msgTTL) { + this.msgTTL = msgTTL; + return this; + } + + public String getMsgGroupID() { + return msgGroupID; + } + + public Producer setMsgGroupID(String msgGroupID) { + this.msgGroupID = msgGroupID; + return this; + } + + public String getFile() { + return file; + } + + public Producer setFile(String file) { + this.file = file; + return this; + } @Override public Object execute(ActionContext context) throws Exception { @@ -70,7 +142,7 @@ public class Producer extends DestAbstract { try (Connection connection = factory.createConnection()) { // If we are reading from file, we process messages sequentially to guarantee ordering. i.e. no thread creation. - if (fileName != null) { + if (file != null) { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Destination dest = getDestination(session); @@ -87,7 +159,7 @@ public class Producer extends DestAbstract { InputStream in; try { - in = new FileInputStream(fileName); + in = new FileInputStream(file); } catch (Exception e) { System.err.println("Error: Unable to open file for reading\n" + e.getMessage()); return null; diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/QueueAbstract.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/QueueAbstract.java index 2ff1b6dc26..12f0e3700d 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/QueueAbstract.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/QueueAbstract.java @@ -203,12 +203,14 @@ public class QueueAbstract extends AbstractAction { this.purgeOnNoConsumers = purgeOnNoConsumers; } - public void setAddress(String address) { + public QueueAbstract setAddress(String address) { this.address = address; + return this; } - public void setName(String name) { + public QueueAbstract setName(String name) { this.name = name; + return this; } public String getName() { diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/StatQueue.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/StatQueue.java index 2d74fc44e9..e9895bdfd6 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/StatQueue.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/queue/StatQueue.java @@ -70,28 +70,34 @@ public class StatQueue extends AbstractAction { private int maxRows = 50; //easier for testing - public void setQueueName(String queueName) { + public StatQueue setQueueName(String queueName) { this.queueName = queueName; + return this; } - public void setOperationName(String operationName) { + public StatQueue setOperationName(String operationName) { this.operationName = operationName; + return this; } - public void setFieldName(String fieldName) { + public StatQueue setFieldName(String fieldName) { this.fieldName = fieldName; + return this; } - public void setValue(String value) { + public StatQueue setValue(String value) { this.value = value; + return this; } - public void setMaxRows(int maxRows) { + public StatQueue setMaxRows(int maxRows) { this.maxRows = maxRows; + return this; } - public void setverbose(boolean verbose) { + public StatQueue setverbose(boolean verbose) { this.verbose = verbose; + return this; } @Override diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java index 42bc468ce4..7a26c34a7c 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/ArtemisTest.java @@ -1356,7 +1356,7 @@ public class ArtemisTest extends CliTestBase { } //read individual lines from byteStream - private ArrayList getOutputLines(TestActionContext context, boolean errorOutput) throws IOException { + public static ArrayList getOutputLines(TestActionContext context, boolean errorOutput) throws IOException { byte[] bytes; if (errorOutput) { diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java index 0f39a1f657..fa747131ed 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliProducerTest.java @@ -16,7 +16,8 @@ */ package org.apache.activemq.cli.test; -import org.apache.activemq.artemis.cli.Artemis; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.cli.commands.messages.Producer; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.utils.CompositeAddress; import org.junit.After; @@ -54,22 +55,17 @@ public class CliProducerTest extends CliTestBase { } private void produceMessages(String address, String message, int msgCount) throws Exception { - Artemis.main("producer", - "--user", "admin", - "--password", "admin", - "--destination", address, - "--message", message, - "--message-count", String.valueOf(msgCount) - ); + new Producer() + .setMessage(message) + .setMessageCount(msgCount) + .setDestination(address) + .setUser("admin") + .setPassword("admin") + .execute(new TestActionContext()); } private void produceMessages(String address, int msgCount) throws Exception { - Artemis.main("producer", - "--user", "admin", - "--password", "admin", - "--destination", address, - "--message-count", String.valueOf(msgCount) - ); + produceMessages(address, null, msgCount); } private void checkSentMessages(Session session, String address, String messageBody) throws Exception { @@ -98,7 +94,7 @@ public class CliProducerTest extends CliTestBase { String queue = "queue"; String fqqn = address + "::" + queue; - createQueue("--multicast", address, queue); + createQueue(RoutingType.MULTICAST, address, queue); Session session = createSession(connection); produceMessages("topic://" + address, TEST_MESSAGE_COUNT); @@ -113,7 +109,7 @@ public class CliProducerTest extends CliTestBase { String fqqn = address + "::" + queue; String messageBody = new StringGenerator().generateRandomString(20); - createQueue("--multicast", address, queue); + createQueue(RoutingType.MULTICAST, address, queue); Session session = createSession(connection); produceMessages("topic://" + address, messageBody, TEST_MESSAGE_COUNT); diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java index f3d15916df..f77b2426be 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/CliTestBase.java @@ -16,9 +16,11 @@ */ package org.apache.activemq.cli.test; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.cli.Artemis; import org.apache.activemq.artemis.cli.commands.Run; +import org.apache.activemq.artemis.cli.commands.queue.CreateQueue; import org.apache.activemq.artemis.cli.commands.tools.LockAbstract; import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; @@ -87,7 +89,7 @@ public class CliTestBase { File rootDirectory = new File(temporaryFolder.getRoot(), "broker"); setupAuth(rootDirectory); Run.setEmbedded(true); - Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login"); + Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login", "--disable-persistence"); System.setProperty("artemis.instance", rootDirectory.getAbsolutePath()); Artemis.internalExecute("run"); } @@ -110,16 +112,18 @@ public class CliTestBase { return new ActiveMQConnectionFactory("tcp://localhost:" + String.valueOf(serverPort)); } - protected void createQueue(String routingTypeOption, String address, String queueName) throws Exception { - Artemis.main("queue", "create", - "--user", "admin", - "--password", "admin", - "--address", address, - "--name", queueName, - routingTypeOption, - "--durable", - "--preserve-on-no-consumers", - "--auto-create-address"); + protected void createQueue(RoutingType routingType, String address, String queueName) throws Exception { + new CreateQueue() + .setAddress(address) + .setName(queueName) + .setAnycast(RoutingType.ANYCAST.equals(routingType)) + .setMulticast(RoutingType.MULTICAST.equals(routingType)) + .setDurable(true) + .setPreserveOnNoConsumers(true) + .setAutoCreateAddress(true) + .setUser("admin") + .setPassword("admin") + .execute(new TestActionContext()); } void closeConnection(ActiveMQConnectionFactory cf, Connection connection) throws Exception { diff --git a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java index 51cc3e5fa0..92e1567de9 100644 --- a/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java +++ b/artemis-cli/src/test/java/org/apache/activemq/cli/test/MessageSerializerTest.java @@ -30,20 +30,24 @@ import javax.jms.TopicSubscriber; import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.UUID; import org.apache.activemq.artemis.api.core.RoutingType; -import org.apache.activemq.artemis.cli.Artemis; +import org.apache.activemq.artemis.cli.commands.address.CreateAddress; +import org.apache.activemq.artemis.cli.commands.messages.Consumer; +import org.apache.activemq.artemis.cli.commands.messages.Producer; +import org.apache.activemq.artemis.cli.commands.queue.StatQueue; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.apache.activemq.cli.test.ArtemisTest.getOutputLines; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -78,17 +82,6 @@ public class MessageSerializerTest extends CliTestBase { return temporaryFolder.newFile("messages.xml"); } - private List generateTextMessages(Session session, String address) throws Exception { - List messages = new ArrayList<>(TEST_MESSAGE_COUNT); - for (int i = 0; i < TEST_MESSAGE_COUNT; i++) { - messages.add(session.createTextMessage(RandomUtil.randomString())); - } - - sendMessages(session, address, messages); - - return messages; - } - private List generateTextMessages(Session session, Destination destination) throws Exception { List messages = new ArrayList<>(TEST_MESSAGE_COUNT); for (int i = 0; i < TEST_MESSAGE_COUNT; i++) { @@ -101,17 +94,38 @@ public class MessageSerializerTest extends CliTestBase { } private void checkSentMessages(Session session, List messages, String address) throws Exception { - List recieved = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address)); + checkSentMessages(session, messages, address, null); + } + + private void checkSentMessages(Session session, List messages, String address, String key) throws Exception { + List received = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address)); for (int i = 0; i < TEST_MESSAGE_COUNT; i++) { - assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText()); + Message m = messages.get(i); + if (m instanceof TextMessage) { + assertEquals(((TextMessage) m).getText(), ((TextMessage) received.get(i)).getText()); + } else if (m instanceof ObjectMessage) { + assertEquals(((ObjectMessage) m).getObject(), ((ObjectMessage) received.get(i)).getObject()); + } else if (m instanceof MapMessage) { + assertEquals(((MapMessage) m).getString(key), ((MapMessage) received.get(i)).getString(key)); + } } } - private void sendMessages(Session session, String address, List messages) throws Exception { - MessageProducer producer = session.createProducer(getDestination(address)); - for (Message m : messages) { - producer.send(m); + private boolean verifyMessageCount(String address, int messageCount) throws Exception { + TestActionContext context = new TestActionContext(); + new StatQueue() + .setQueueName(address) + .setUser("admin") + .setPassword("admin") + .execute(context); + int currentMessageCount; + try { + // parse the value for MESSAGE_COUNT from the output + currentMessageCount = Integer.parseInt(getOutputLines(context, false).get(1).split("\\|")[4].trim()); + } catch (Exception e) { + currentMessageCount = 0; } + return (messageCount == currentMessageCount); } private void sendMessages(Session session, Destination destination, List messages) throws Exception { @@ -125,35 +139,35 @@ public class MessageSerializerTest extends CliTestBase { exportMessages(address, TEST_MESSAGE_COUNT, false, "test-client", output); } - private void exportMessages(String address, int noMessages, boolean durable, String clientId, File output) throws Exception { - List args = new ArrayList<>(Arrays.asList("consumer", - "--user", "admin", - "--password", "admin", - "--destination", address, - "--message-count", Integer.toString(noMessages), - "--data", output.getAbsolutePath(), - "--clientID", clientId)); - if (durable) { - args.add("--durable"); - } - - Artemis.main(args.toArray(new String[0])); + private void exportMessages(String address, int messageCount, boolean durable, String clientId, File output) throws Exception { + new Consumer() + .setFile(output.getAbsolutePath()) + .setDurable(durable) + .setDestination(address) + .setMessageCount(messageCount) + .setUser("admin") + .setPassword("admin") + .setClientID(clientId) + .execute(new TestActionContext()); } private void importMessages(String address, File input) throws Exception { - Artemis.main("producer", - "--user", "admin", - "--password", "admin", - "--destination", address, - "--data", input.getAbsolutePath()); + new Producer() + .setFile(input.getAbsolutePath()) + .setDestination(address) + .setUser("admin") + .setPassword("admin") + .execute(new TestActionContext()); } private void createBothTypeAddress(String address) throws Exception { - Artemis.main("address", "create", - "--user", "admin", - "--password", "admin", - "--name", address, - "--anycast", "--multicast"); + new CreateAddress() + .setAnycast(true) + .setMulticast(true) + .setName(address) + .setUser("admin") + .setPassword("admin") + .execute(new TestActionContext()); } @Test @@ -163,18 +177,15 @@ public class MessageSerializerTest extends CliTestBase { Session session = createSession(connection); - List messages = generateTextMessages(session, address); + List sent = generateTextMessages(session, getDestination(address)); exportMessages(address, file); - // Ensure there's nothing left to consume - MessageConsumer consumer = session.createConsumer(getDestination(address)); - assertNull(consumer.receive(1000)); - consumer.close(); + Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100); importMessages(address, file); - - checkSentMessages(session, messages, address); + Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100); + checkSentMessages(session, sent, address); } @Test @@ -190,19 +201,14 @@ public class MessageSerializerTest extends CliTestBase { sent.add(session.createObjectMessage(UUID.randomUUID())); } - sendMessages(session, address, sent); + sendMessages(session, getDestination(address), sent); exportMessages(address, file); - // Ensure there's nothing left to consume - MessageConsumer consumer = session.createConsumer(getDestination(address)); - assertNull(consumer.receive(1000)); - consumer.close(); + Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100); importMessages(address, file); - List received = consumeMessages(session, address, TEST_MESSAGE_COUNT, false); - for (int i = 0; i < TEST_MESSAGE_COUNT; i++) { - assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject()); - } + Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100); + checkSentMessages(session, sent, address); } @Test @@ -220,19 +226,14 @@ public class MessageSerializerTest extends CliTestBase { sent.add(m); } - sendMessages(session, address, sent); + sendMessages(session, getDestination(address), sent); exportMessages(address, file); - // Ensure there's nothing left to consume - MessageConsumer consumer = session.createConsumer(getDestination(address)); - assertNull(consumer.receive(1000)); - consumer.close(); + Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100); importMessages(address, file); - List received = consumeMessages(session, address, TEST_MESSAGE_COUNT, false); - for (int i = 0; i < TEST_MESSAGE_COUNT; i++) { - assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key)); - } + Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100); + checkSentMessages(session, sent, address, key); } @Test @@ -251,8 +252,8 @@ public class MessageSerializerTest extends CliTestBase { String queue1Name = "queue1"; String queue2Name = "queue2"; - createQueue("--" + routingType.toString().toLowerCase(), address, queue1Name); - createQueue("--" + routingType.toString().toLowerCase(), address, queue2Name); + createQueue(routingType, address, queue1Name); + createQueue(routingType, address, queue2Name); try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) { @@ -265,11 +266,12 @@ public class MessageSerializerTest extends CliTestBase { MessageConsumer consumer1 = session.createConsumer(queue1); MessageConsumer consumer2 = session.createConsumer(queue2); - Artemis.main("producer", - "--user", "admin", - "--password", "admin", - "--destination", (routingType == RoutingType.ANYCAST ? ActiveMQDestination.QUEUE_QUALIFIED_PREFIX : ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) + CompositeAddress.toFullyQualified(address, queue1Name), - "--message-count", "5"); + new Producer() + .setDestination((routingType == RoutingType.ANYCAST ? ActiveMQDestination.QUEUE_QUALIFIED_PREFIX : ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) + CompositeAddress.toFullyQualified(address, queue1Name)) + .setMessageCount(5) + .setUser("admin") + .setPassword("admin") + .execute(new TestActionContext()); assertNull(consumer2.receive(1000)); assertNotNull(consumer1.receive(1000)); @@ -285,7 +287,7 @@ public class MessageSerializerTest extends CliTestBase { File file = createMessageFile(); - createQueue("--multicast", addr, queue); + createQueue(RoutingType.MULTICAST, addr, queue); Session session = createSession(connection); @@ -308,12 +310,12 @@ public class MessageSerializerTest extends CliTestBase { File file = createMessageFile(); - createQueue("--multicast", mAddress, queueM1Name); - createQueue("--multicast", mAddress, queueM2Name); + createQueue(RoutingType.MULTICAST, mAddress, queueM1Name); + createQueue(RoutingType.MULTICAST, mAddress, queueM2Name); Session session = createSession(connection); - List messages = generateTextMessages(session, aAddress); + List messages = generateTextMessages(session, getDestination(aAddress)); exportMessages(aAddress, file); importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + mAddress, file); @@ -333,12 +335,12 @@ public class MessageSerializerTest extends CliTestBase { File file = createMessageFile(); - createQueue("--multicast", mAddress, queueM1Name); - createQueue("--multicast", mAddress, queueM2Name); + createQueue(RoutingType.MULTICAST, mAddress, queueM1Name); + createQueue(RoutingType.MULTICAST, mAddress, queueM2Name); Session session = createSession(connection); - List messages = generateTextMessages(session, aAddress); + List messages = generateTextMessages(session, getDestination(aAddress)); exportMessages(aAddress, file); importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + fqqnMulticast1, file); @@ -380,12 +382,12 @@ public class MessageSerializerTest extends CliTestBase { connection.setClientID(clientId); createBothTypeAddress(address); - createQueue("--anycast", address, address); + createQueue(RoutingType.ANYCAST, address, address); Session session = createSession(connection); TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic(address), "test-subscriber"); - List messages = generateTextMessages(session, address); + List messages = generateTextMessages(session, getDestination(address)); exportMessages(address, file);