From eecf7b6eade1ab168511d37835e2e089950c3584 Mon Sep 17 00:00:00 2001 From: Ken Liao Date: Sat, 19 Oct 2024 16:06:21 -0700 Subject: [PATCH 1/2] Add priority and disableMessageTimestamp to the producer command of activemq CLI --- .../org/apache/activemq/util/ProducerThread.java | 12 ++++++++++++ .../activemq/console/command/ProducerCommand.java | 12 ++++++++++++ .../org/apache/activemq/console/command/producer.txt | 4 +++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java index da032320f4..447639ac83 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java @@ -40,6 +40,8 @@ public class ProducerThread extends Thread { long msgTTL = 0L; String msgGroupID=null; int transactionBatchSize; + int priority = 4; + boolean disableMessageTimestamp = false; int transactions = 0; AtomicInteger sentCount = new AtomicInteger(0); @@ -64,6 +66,8 @@ public class ProducerThread extends Thread { producer = session.createProducer(destination); producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); producer.setTimeToLive(msgTTL); + producer.setPriority(priority); + producer.setDisableMessageTimestamp(disableMessageTimestamp); initPayLoad(); running = true; @@ -306,4 +310,12 @@ public class ProducerThread extends Thread { public void resetCounters(){ this.sentCount.set(0); } + + public void setMessagePriority(int priority) { + this.priority = priority; + } + + public void setDisableMessageTimestamp(boolean disableMessageTimestamp) { + this.disableMessageTimestamp = disableMessageTimestamp; + } } diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java index 28409fd437..5546610490 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java @@ -45,6 +45,8 @@ public class ProducerCommand extends AbstractCommand { String msgGroupID=null; int transactionBatchSize; private int parallelThreads = 1; + int priority = 4; // Default priority + boolean disableMessageTimestamp = false; @Override protected void runTask(List tokens) throws Exception { @@ -82,6 +84,8 @@ public class ProducerCommand extends AbstractCommand { producer.setMsgGroupID(msgGroupID); producer.setTextMessageSize(textMessageSize); producer.setFinished(active); + producer.setMessagePriority(priority); + producer.setDisableMessageTimestamp(disableMessageTimestamp); producer.start(); } @@ -213,6 +217,14 @@ public class ProducerCommand extends AbstractCommand { this.message = message; } + public void setMessagePriority(int priority) { + this.priority = priority; + } + + public void setDisableMessageTimestamp(boolean disableMessageTimestamp) { + this.disableMessageTimestamp = disableMessageTimestamp; + } + @Override protected void printHelp() { printHelpFromFile(); diff --git a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt index 1f37586a9f..d836744710 100644 --- a/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt +++ b/activemq-console/src/main/resources/org/apache/activemq/console/command/producer.txt @@ -15,4 +15,6 @@ Options : [--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used [--message ..] - a text string to use as the message body [--payloadUrl URL] - a url pointing to a document to use as the message body - [--msgGroupID ..] - JMS message group identifier \ No newline at end of file + [--msgGroupID ..] - JMS message group identifier + [--messagePriority N] - The message priority. Default is 4 + [--disableMessageTimestamp true|false] - Whether or not to disable timestamp on message, default false \ No newline at end of file From 99d95672781940bf84b14947df7b6fff79af5970 Mon Sep 17 00:00:00 2001 From: Ken Liao Date: Tue, 22 Oct 2024 10:59:08 -0700 Subject: [PATCH 2/2] Use default priority value from JMS interface --- .../main/java/org/apache/activemq/util/ProducerThread.java | 4 +++- .../org/apache/activemq/console/command/ProducerCommand.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java index 447639ac83..f315f5275d 100644 --- a/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java +++ b/activemq-client/src/main/java/org/apache/activemq/util/ProducerThread.java @@ -25,6 +25,8 @@ import java.net.URL; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import static jakarta.jms.Message.DEFAULT_PRIORITY; + public class ProducerThread extends Thread { private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class); @@ -40,7 +42,7 @@ public class ProducerThread extends Thread { long msgTTL = 0L; String msgGroupID=null; int transactionBatchSize; - int priority = 4; + int priority = DEFAULT_PRIORITY; boolean disableMessageTimestamp = false; int transactions = 0; diff --git a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java index 5546610490..79f36842de 100644 --- a/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java +++ b/activemq-console/src/main/java/org/apache/activemq/console/command/ProducerCommand.java @@ -27,6 +27,8 @@ import jakarta.jms.Session; import java.util.List; import java.util.concurrent.CountDownLatch; +import static jakarta.jms.Message.DEFAULT_PRIORITY; + public class ProducerCommand extends AbstractCommand { private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class); @@ -45,7 +47,7 @@ public class ProducerCommand extends AbstractCommand { String msgGroupID=null; int transactionBatchSize; private int parallelThreads = 1; - int priority = 4; // Default priority + int priority = DEFAULT_PRIORITY; boolean disableMessageTimestamp = false; @Override