mirror of https://github.com/apache/activemq.git
Merge pull request #1326 from kenliao94/improve_cli
[AMQ-9596] Add priority and disableMessageTimestamp to the producer command
This commit is contained in:
commit
f4e895ed46
|
@ -25,6 +25,8 @@ import java.net.URL;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import static jakarta.jms.Message.DEFAULT_PRIORITY;
|
||||||
|
|
||||||
public class ProducerThread extends Thread {
|
public class ProducerThread extends Thread {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
|
||||||
|
@ -40,6 +42,8 @@ public class ProducerThread extends Thread {
|
||||||
long msgTTL = 0L;
|
long msgTTL = 0L;
|
||||||
String msgGroupID=null;
|
String msgGroupID=null;
|
||||||
int transactionBatchSize;
|
int transactionBatchSize;
|
||||||
|
int priority = DEFAULT_PRIORITY;
|
||||||
|
boolean disableMessageTimestamp = false;
|
||||||
|
|
||||||
int transactions = 0;
|
int transactions = 0;
|
||||||
AtomicInteger sentCount = new AtomicInteger(0);
|
AtomicInteger sentCount = new AtomicInteger(0);
|
||||||
|
@ -64,6 +68,8 @@ public class ProducerThread extends Thread {
|
||||||
producer = session.createProducer(destination);
|
producer = session.createProducer(destination);
|
||||||
producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
|
||||||
producer.setTimeToLive(msgTTL);
|
producer.setTimeToLive(msgTTL);
|
||||||
|
producer.setPriority(priority);
|
||||||
|
producer.setDisableMessageTimestamp(disableMessageTimestamp);
|
||||||
initPayLoad();
|
initPayLoad();
|
||||||
running = true;
|
running = true;
|
||||||
|
|
||||||
|
@ -306,4 +312,12 @@ public class ProducerThread extends Thread {
|
||||||
public void resetCounters(){
|
public void resetCounters(){
|
||||||
this.sentCount.set(0);
|
this.sentCount.set(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMessagePriority(int priority) {
|
||||||
|
this.priority = priority;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
|
||||||
|
this.disableMessageTimestamp = disableMessageTimestamp;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,8 @@ import jakarta.jms.Session;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import static jakarta.jms.Message.DEFAULT_PRIORITY;
|
||||||
|
|
||||||
public class ProducerCommand extends AbstractCommand {
|
public class ProducerCommand extends AbstractCommand {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class);
|
||||||
|
|
||||||
|
@ -45,6 +47,8 @@ public class ProducerCommand extends AbstractCommand {
|
||||||
String msgGroupID=null;
|
String msgGroupID=null;
|
||||||
int transactionBatchSize;
|
int transactionBatchSize;
|
||||||
private int parallelThreads = 1;
|
private int parallelThreads = 1;
|
||||||
|
int priority = DEFAULT_PRIORITY;
|
||||||
|
boolean disableMessageTimestamp = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void runTask(List<String> tokens) throws Exception {
|
protected void runTask(List<String> tokens) throws Exception {
|
||||||
|
@ -82,6 +86,8 @@ public class ProducerCommand extends AbstractCommand {
|
||||||
producer.setMsgGroupID(msgGroupID);
|
producer.setMsgGroupID(msgGroupID);
|
||||||
producer.setTextMessageSize(textMessageSize);
|
producer.setTextMessageSize(textMessageSize);
|
||||||
producer.setFinished(active);
|
producer.setFinished(active);
|
||||||
|
producer.setMessagePriority(priority);
|
||||||
|
producer.setDisableMessageTimestamp(disableMessageTimestamp);
|
||||||
producer.start();
|
producer.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,6 +219,14 @@ public class ProducerCommand extends AbstractCommand {
|
||||||
this.message = message;
|
this.message = message;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMessagePriority(int priority) {
|
||||||
|
this.priority = priority;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
|
||||||
|
this.disableMessageTimestamp = disableMessageTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void printHelp() {
|
protected void printHelp() {
|
||||||
printHelpFromFile();
|
printHelpFromFile();
|
||||||
|
|
|
@ -15,4 +15,6 @@ Options :
|
||||||
[--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
|
[--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
|
||||||
[--message ..] - a text string to use as the message body
|
[--message ..] - a text string to use as the message body
|
||||||
[--payloadUrl URL] - a url pointing to a document to use as the message body
|
[--payloadUrl URL] - a url pointing to a document to use as the message body
|
||||||
[--msgGroupID ..] - JMS message group identifier
|
[--msgGroupID ..] - JMS message group identifier
|
||||||
|
[--messagePriority N] - The message priority. Default is 4
|
||||||
|
[--disableMessageTimestamp true|false] - Whether or not to disable timestamp on message, default false
|
Loading…
Reference in New Issue