This commit is contained in:
Clebert Suconic 2020-03-11 11:10:32 -04:00
commit df014631a8
11 changed files with 311 additions and 126 deletions

View File

@ -38,8 +38,9 @@ public abstract class AddressAbstract extends AbstractAction {
private Boolean noMulticast;
public void setName(String name) {
public AbstractAction setName(String name) {
this.name = name;
return this;
}
public String getName(boolean requireInput) {

View File

@ -48,24 +48,27 @@ public class ConnectionAbstract extends InputAbstract {
return user;
}
public void setUser(String user) {
public ConnectionAbstract setUser(String user) {
this.user = user;
return this;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
public ConnectionAbstract setPassword(String password) {
this.password = password;
return this;
}
public String getClientID() {
return clientID;
}
public void setClientID(String clientID) {
public ConnectionAbstract setClientID(String clientID) {
this.clientID = clientID;
return this;
}
public String getProtocol() {

View File

@ -140,4 +140,49 @@ public class Consumer extends DestAbstract {
messageSerializer.write(message);
}
}
public boolean isDurable() {
return durable;
}
public Consumer setDurable(boolean durable) {
this.durable = durable;
return this;
}
public boolean isBreakOnNull() {
return breakOnNull;
}
public Consumer setBreakOnNull(boolean breakOnNull) {
this.breakOnNull = breakOnNull;
return this;
}
public int getReceiveTimeout() {
return receiveTimeout;
}
public Consumer setReceiveTimeout(int receiveTimeout) {
this.receiveTimeout = receiveTimeout;
return this;
}
public String getFilter() {
return filter;
}
public Consumer setFilter(String filter) {
this.filter = filter;
return this;
}
public String getFile() {
return file;
}
public Consumer setFile(String file) {
this.file = file;
return this;
}
}

View File

@ -79,4 +79,58 @@ public class DestAbstract extends ConnectionAbstract {
return destination;
}
}
public String getDestination() {
return destination;
}
public DestAbstract setDestination(String destination) {
this.destination = destination;
return this;
}
public int getMessageCount() {
return messageCount;
}
public DestAbstract setMessageCount(int messageCount) {
this.messageCount = messageCount;
return this;
}
public int getSleep() {
return sleep;
}
public DestAbstract setSleep(int sleep) {
this.sleep = sleep;
return this;
}
public int getTxBatchSize() {
return txBatchSize;
}
public DestAbstract setTxBatchSize(int txBatchSize) {
this.txBatchSize = txBatchSize;
return this;
}
public int getThreads() {
return threads;
}
public DestAbstract setThreads(int threads) {
this.threads = threads;
return this;
}
public String getSerializer() {
return serializer;
}
public DestAbstract setSerializer(String serializer) {
this.serializer = serializer;
return this;
}
}

View File

@ -59,7 +59,79 @@ public class Producer extends DestAbstract {
String msgGroupID = null;
@Option(name = "--data", description = "Messages will be read form the specified file, other message options will be ignored.")
String fileName = null;
String file = null;
public boolean isNonpersistent() {
return nonpersistent;
}
public Producer setNonpersistent(boolean nonpersistent) {
this.nonpersistent = nonpersistent;
return this;
}
public int getMessageSize() {
return messageSize;
}
public Producer setMessageSize(int messageSize) {
this.messageSize = messageSize;
return this;
}
public String getMessage() {
return message;
}
public Producer setMessage(String message) {
this.message = message;
return this;
}
public int getTextMessageSize() {
return textMessageSize;
}
public Producer setTextMessageSize(int textMessageSize) {
this.textMessageSize = textMessageSize;
return this;
}
public int getObjectSize() {
return objectSize;
}
public Producer setObjectSize(int objectSize) {
this.objectSize = objectSize;
return this;
}
public long getMsgTTL() {
return msgTTL;
}
public Producer setMsgTTL(long msgTTL) {
this.msgTTL = msgTTL;
return this;
}
public String getMsgGroupID() {
return msgGroupID;
}
public Producer setMsgGroupID(String msgGroupID) {
this.msgGroupID = msgGroupID;
return this;
}
public String getFile() {
return file;
}
public Producer setFile(String file) {
this.file = file;
return this;
}
@Override
public Object execute(ActionContext context) throws Exception {
@ -70,7 +142,7 @@ public class Producer extends DestAbstract {
try (Connection connection = factory.createConnection()) {
// If we are reading from file, we process messages sequentially to guarantee ordering. i.e. no thread creation.
if (fileName != null) {
if (file != null) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination dest = getDestination(session);
@ -87,7 +159,7 @@ public class Producer extends DestAbstract {
InputStream in;
try {
in = new FileInputStream(fileName);
in = new FileInputStream(file);
} catch (Exception e) {
System.err.println("Error: Unable to open file for reading\n" + e.getMessage());
return null;

View File

@ -203,12 +203,14 @@ public class QueueAbstract extends AbstractAction {
this.purgeOnNoConsumers = purgeOnNoConsumers;
}
public void setAddress(String address) {
public QueueAbstract setAddress(String address) {
this.address = address;
return this;
}
public void setName(String name) {
public QueueAbstract setName(String name) {
this.name = name;
return this;
}
public String getName() {

View File

@ -70,28 +70,34 @@ public class StatQueue extends AbstractAction {
private int maxRows = 50;
//easier for testing
public void setQueueName(String queueName) {
public StatQueue setQueueName(String queueName) {
this.queueName = queueName;
return this;
}
public void setOperationName(String operationName) {
public StatQueue setOperationName(String operationName) {
this.operationName = operationName;
return this;
}
public void setFieldName(String fieldName) {
public StatQueue setFieldName(String fieldName) {
this.fieldName = fieldName;
return this;
}
public void setValue(String value) {
public StatQueue setValue(String value) {
this.value = value;
return this;
}
public void setMaxRows(int maxRows) {
public StatQueue setMaxRows(int maxRows) {
this.maxRows = maxRows;
return this;
}
public void setverbose(boolean verbose) {
public StatQueue setverbose(boolean verbose) {
this.verbose = verbose;
return this;
}
@Override

View File

@ -1356,7 +1356,7 @@ public class ArtemisTest extends CliTestBase {
}
//read individual lines from byteStream
private ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput) throws IOException {
public static ArrayList<String> getOutputLines(TestActionContext context, boolean errorOutput) throws IOException {
byte[] bytes;
if (errorOutput) {

View File

@ -16,7 +16,8 @@
*/
package org.apache.activemq.cli.test;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.junit.After;
@ -54,22 +55,17 @@ public class CliProducerTest extends CliTestBase {
}
private void produceMessages(String address, String message, int msgCount) throws Exception {
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--message", message,
"--message-count", String.valueOf(msgCount)
);
new Producer()
.setMessage(message)
.setMessageCount(msgCount)
.setDestination(address)
.setUser("admin")
.setPassword("admin")
.execute(new TestActionContext());
}
private void produceMessages(String address, int msgCount) throws Exception {
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--message-count", String.valueOf(msgCount)
);
produceMessages(address, null, msgCount);
}
private void checkSentMessages(Session session, String address, String messageBody) throws Exception {
@ -98,7 +94,7 @@ public class CliProducerTest extends CliTestBase {
String queue = "queue";
String fqqn = address + "::" + queue;
createQueue("--multicast", address, queue);
createQueue(RoutingType.MULTICAST, address, queue);
Session session = createSession(connection);
produceMessages("topic://" + address, TEST_MESSAGE_COUNT);
@ -113,7 +109,7 @@ public class CliProducerTest extends CliTestBase {
String fqqn = address + "::" + queue;
String messageBody = new StringGenerator().generateRandomString(20);
createQueue("--multicast", address, queue);
createQueue(RoutingType.MULTICAST, address, queue);
Session session = createSession(connection);
produceMessages("topic://" + address, messageBody, TEST_MESSAGE_COUNT);

View File

@ -16,9 +16,11 @@
*/
package org.apache.activemq.cli.test;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.queue.CreateQueue;
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@ -87,7 +89,7 @@ public class CliTestBase {
File rootDirectory = new File(temporaryFolder.getRoot(), "broker");
setupAuth(rootDirectory);
Run.setEmbedded(true);
Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login");
Artemis.main("create", rootDirectory.getAbsolutePath(), "--silent", "--no-fsync", "--no-autotune", "--no-web", "--require-login", "--disable-persistence");
System.setProperty("artemis.instance", rootDirectory.getAbsolutePath());
Artemis.internalExecute("run");
}
@ -110,16 +112,18 @@ public class CliTestBase {
return new ActiveMQConnectionFactory("tcp://localhost:" + String.valueOf(serverPort));
}
protected void createQueue(String routingTypeOption, String address, String queueName) throws Exception {
Artemis.main("queue", "create",
"--user", "admin",
"--password", "admin",
"--address", address,
"--name", queueName,
routingTypeOption,
"--durable",
"--preserve-on-no-consumers",
"--auto-create-address");
protected void createQueue(RoutingType routingType, String address, String queueName) throws Exception {
new CreateQueue()
.setAddress(address)
.setName(queueName)
.setAnycast(RoutingType.ANYCAST.equals(routingType))
.setMulticast(RoutingType.MULTICAST.equals(routingType))
.setDurable(true)
.setPreserveOnNoConsumers(true)
.setAutoCreateAddress(true)
.setUser("admin")
.setPassword("admin")
.execute(new TestActionContext());
}
void closeConnection(ActiveMQConnectionFactory cf, Connection connection) throws Exception {

View File

@ -30,20 +30,24 @@ import javax.jms.TopicSubscriber;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.cli.Artemis;
import org.apache.activemq.artemis.cli.commands.address.CreateAddress;
import org.apache.activemq.artemis.cli.commands.messages.Consumer;
import org.apache.activemq.artemis.cli.commands.messages.Producer;
import org.apache.activemq.artemis.cli.commands.queue.StatQueue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.cli.test.ArtemisTest.getOutputLines;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -78,17 +82,6 @@ public class MessageSerializerTest extends CliTestBase {
return temporaryFolder.newFile("messages.xml");
}
private List<Message> generateTextMessages(Session session, String address) throws Exception {
List<Message> messages = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
messages.add(session.createTextMessage(RandomUtil.randomString()));
}
sendMessages(session, address, messages);
return messages;
}
private List<Message> generateTextMessages(Session session, Destination destination) throws Exception {
List<Message> messages = new ArrayList<>(TEST_MESSAGE_COUNT);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
@ -101,17 +94,38 @@ public class MessageSerializerTest extends CliTestBase {
}
private void checkSentMessages(Session session, List<Message> messages, String address) throws Exception {
List<Message> recieved = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
checkSentMessages(session, messages, address, null);
}
private void checkSentMessages(Session session, List<Message> messages, String address, String key) throws Exception {
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, CompositeAddress.isFullyQualified(address));
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
assertEquals(((TextMessage) messages.get(i)).getText(), ((TextMessage) recieved.get(i)).getText());
Message m = messages.get(i);
if (m instanceof TextMessage) {
assertEquals(((TextMessage) m).getText(), ((TextMessage) received.get(i)).getText());
} else if (m instanceof ObjectMessage) {
assertEquals(((ObjectMessage) m).getObject(), ((ObjectMessage) received.get(i)).getObject());
} else if (m instanceof MapMessage) {
assertEquals(((MapMessage) m).getString(key), ((MapMessage) received.get(i)).getString(key));
}
}
}
private void sendMessages(Session session, String address, List<Message> messages) throws Exception {
MessageProducer producer = session.createProducer(getDestination(address));
for (Message m : messages) {
producer.send(m);
private boolean verifyMessageCount(String address, int messageCount) throws Exception {
TestActionContext context = new TestActionContext();
new StatQueue()
.setQueueName(address)
.setUser("admin")
.setPassword("admin")
.execute(context);
int currentMessageCount;
try {
// parse the value for MESSAGE_COUNT from the output
currentMessageCount = Integer.parseInt(getOutputLines(context, false).get(1).split("\\|")[4].trim());
} catch (Exception e) {
currentMessageCount = 0;
}
return (messageCount == currentMessageCount);
}
private void sendMessages(Session session, Destination destination, List<Message> messages) throws Exception {
@ -125,35 +139,35 @@ public class MessageSerializerTest extends CliTestBase {
exportMessages(address, TEST_MESSAGE_COUNT, false, "test-client", output);
}
private void exportMessages(String address, int noMessages, boolean durable, String clientId, File output) throws Exception {
List<String> args = new ArrayList<>(Arrays.asList("consumer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--message-count", Integer.toString(noMessages),
"--data", output.getAbsolutePath(),
"--clientID", clientId));
if (durable) {
args.add("--durable");
}
Artemis.main(args.toArray(new String[0]));
private void exportMessages(String address, int messageCount, boolean durable, String clientId, File output) throws Exception {
new Consumer()
.setFile(output.getAbsolutePath())
.setDurable(durable)
.setDestination(address)
.setMessageCount(messageCount)
.setUser("admin")
.setPassword("admin")
.setClientID(clientId)
.execute(new TestActionContext());
}
private void importMessages(String address, File input) throws Exception {
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
"--destination", address,
"--data", input.getAbsolutePath());
new Producer()
.setFile(input.getAbsolutePath())
.setDestination(address)
.setUser("admin")
.setPassword("admin")
.execute(new TestActionContext());
}
private void createBothTypeAddress(String address) throws Exception {
Artemis.main("address", "create",
"--user", "admin",
"--password", "admin",
"--name", address,
"--anycast", "--multicast");
new CreateAddress()
.setAnycast(true)
.setMulticast(true)
.setName(address)
.setUser("admin")
.setPassword("admin")
.execute(new TestActionContext());
}
@Test
@ -163,18 +177,15 @@ public class MessageSerializerTest extends CliTestBase {
Session session = createSession(connection);
List<Message> messages = generateTextMessages(session, address);
List<Message> sent = generateTextMessages(session, getDestination(address));
exportMessages(address, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100);
importMessages(address, file);
checkSentMessages(session, messages, address);
Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100);
checkSentMessages(session, sent, address);
}
@Test
@ -190,19 +201,14 @@ public class MessageSerializerTest extends CliTestBase {
sent.add(session.createObjectMessage(UUID.randomUUID()));
}
sendMessages(session, address, sent);
sendMessages(session, getDestination(address), sent);
exportMessages(address, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100);
importMessages(address, file);
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, false);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
assertEquals(((ObjectMessage) sent.get(i)).getObject(), ((ObjectMessage) received.get(i)).getObject());
}
Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100);
checkSentMessages(session, sent, address);
}
@Test
@ -220,19 +226,14 @@ public class MessageSerializerTest extends CliTestBase {
sent.add(m);
}
sendMessages(session, address, sent);
sendMessages(session, getDestination(address), sent);
exportMessages(address, file);
// Ensure there's nothing left to consume
MessageConsumer consumer = session.createConsumer(getDestination(address));
assertNull(consumer.receive(1000));
consumer.close();
Wait.assertTrue(() -> verifyMessageCount(address, 0), 2000, 100);
importMessages(address, file);
List<Message> received = consumeMessages(session, address, TEST_MESSAGE_COUNT, false);
for (int i = 0; i < TEST_MESSAGE_COUNT; i++) {
assertEquals(((MapMessage) sent.get(i)).getString(key), ((MapMessage) received.get(i)).getString(key));
}
Wait.assertTrue(() -> verifyMessageCount(address, TEST_MESSAGE_COUNT), 2000, 100);
checkSentMessages(session, sent, address, key);
}
@Test
@ -251,8 +252,8 @@ public class MessageSerializerTest extends CliTestBase {
String queue1Name = "queue1";
String queue2Name = "queue2";
createQueue("--" + routingType.toString().toLowerCase(), address, queue1Name);
createQueue("--" + routingType.toString().toLowerCase(), address, queue2Name);
createQueue(routingType, address, queue1Name);
createQueue(routingType, address, queue2Name);
try (ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = cf.createConnection("admin", "admin");) {
@ -265,11 +266,12 @@ public class MessageSerializerTest extends CliTestBase {
MessageConsumer consumer1 = session.createConsumer(queue1);
MessageConsumer consumer2 = session.createConsumer(queue2);
Artemis.main("producer",
"--user", "admin",
"--password", "admin",
"--destination", (routingType == RoutingType.ANYCAST ? ActiveMQDestination.QUEUE_QUALIFIED_PREFIX : ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) + CompositeAddress.toFullyQualified(address, queue1Name),
"--message-count", "5");
new Producer()
.setDestination((routingType == RoutingType.ANYCAST ? ActiveMQDestination.QUEUE_QUALIFIED_PREFIX : ActiveMQDestination.TOPIC_QUALIFIED_PREFIX) + CompositeAddress.toFullyQualified(address, queue1Name))
.setMessageCount(5)
.setUser("admin")
.setPassword("admin")
.execute(new TestActionContext());
assertNull(consumer2.receive(1000));
assertNotNull(consumer1.receive(1000));
@ -285,7 +287,7 @@ public class MessageSerializerTest extends CliTestBase {
File file = createMessageFile();
createQueue("--multicast", addr, queue);
createQueue(RoutingType.MULTICAST, addr, queue);
Session session = createSession(connection);
@ -308,12 +310,12 @@ public class MessageSerializerTest extends CliTestBase {
File file = createMessageFile();
createQueue("--multicast", mAddress, queueM1Name);
createQueue("--multicast", mAddress, queueM2Name);
createQueue(RoutingType.MULTICAST, mAddress, queueM1Name);
createQueue(RoutingType.MULTICAST, mAddress, queueM2Name);
Session session = createSession(connection);
List<Message> messages = generateTextMessages(session, aAddress);
List<Message> messages = generateTextMessages(session, getDestination(aAddress));
exportMessages(aAddress, file);
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + mAddress, file);
@ -333,12 +335,12 @@ public class MessageSerializerTest extends CliTestBase {
File file = createMessageFile();
createQueue("--multicast", mAddress, queueM1Name);
createQueue("--multicast", mAddress, queueM2Name);
createQueue(RoutingType.MULTICAST, mAddress, queueM1Name);
createQueue(RoutingType.MULTICAST, mAddress, queueM2Name);
Session session = createSession(connection);
List<Message> messages = generateTextMessages(session, aAddress);
List<Message> messages = generateTextMessages(session, getDestination(aAddress));
exportMessages(aAddress, file);
importMessages(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + fqqnMulticast1, file);
@ -380,12 +382,12 @@ public class MessageSerializerTest extends CliTestBase {
connection.setClientID(clientId);
createBothTypeAddress(address);
createQueue("--anycast", address, address);
createQueue(RoutingType.ANYCAST, address, address);
Session session = createSession(connection);
TopicSubscriber subscriber = session.createDurableSubscriber(session.createTopic(address), "test-subscriber");
List<Message> messages = generateTextMessages(session, address);
List<Message> messages = generateTextMessages(session, getDestination(address));
exportMessages(address, file);