https://issues.apache.org/jira/browse/AMQ-5558 - support durable consumers and ack modes for consumers

This commit is contained in:
Dejan Bosanac 2015-03-24 13:17:50 +01:00
parent 3051882f92
commit ebb3df7681
6 changed files with 103 additions and 34 deletions

View File

@ -30,9 +30,10 @@ public class ConsumerThread extends Thread {
int receiveTimeOut = 3000;
Destination destination;
Session session;
boolean durable;
boolean breakOnNull = true;
int sleep;
int transactionBatchSize;
int batchSize;
int received = 0;
int transactions = 0;
@ -52,7 +53,11 @@ public class ConsumerThread extends Thread {
String threadName = Thread.currentThread().getName();
LOG.info(threadName + " wait until " + messageCount + " messages are consumed");
try {
consumer = session.createConsumer(destination);
if (durable && destination instanceof Topic) {
consumer = session.createDurableSubscriber((Topic) destination, getName());
} else {
consumer = session.createConsumer(destination);
}
while (running && received < messageCount) {
Message msg = consumer.receive(receiveTimeOut);
if (msg != null) {
@ -70,11 +75,17 @@ public class ConsumerThread extends Thread {
}
}
if (transactionBatchSize > 0 && received > 0 && received % transactionBatchSize == 0) {
LOG.info(threadName + " Committing transaction: " + transactions++);
session.commit();
if (session.getTransacted()) {
if (batchSize > 0 && received > 0 && received % batchSize == 0) {
LOG.info(threadName + " Committing transaction: " + transactions++);
session.commit();
}
} else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
if (batchSize > 0 && received > 0 && received % batchSize == 0) {
LOG.info("Acknowledging last " + batchSize + " messages; messages so far = " + received);
msg.acknowledge();
}
}
if (sleep > 0) {
Thread.sleep(sleep);
}
@ -103,6 +114,14 @@ public class ConsumerThread extends Thread {
return received;
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
@ -111,12 +130,12 @@ public class ConsumerThread extends Thread {
this.breakOnNull = breakOnNull;
}
public int getTransactionBatchSize() {
return transactionBatchSize;
public int getBatchSize() {
return batchSize;
}
public void setTransactionBatchSize(int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public int getMessageCount() {

View File

@ -43,7 +43,7 @@ public class ProducerThread extends Thread {
int sentCount = 0;
String message;
String messageText = null;
String url = null;
String payloadUrl = null;
byte[] payload = null;
boolean running = false;
CountDownLatch finished;
@ -123,8 +123,8 @@ public class ProducerThread extends Thread {
if (messageText == null) {
messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
}
} else if (url != null) {
messageText = readInputStream(new URL(url).openStream(), -1, i);
} else if (payloadUrl != null) {
messageText = readInputStream(new URL(payloadUrl).openStream(), -1, i);
} else if (message != null) {
messageText = message;
} else {
@ -249,12 +249,12 @@ public class ProducerThread extends Thread {
this.finished = finished;
}
public String getUrl() {
return url;
public String getPayloadUrl() {
return payloadUrl;
}
public void setUrl(String url) {
this.url = url;
public void setPayloadUrl(String payloadUrl) {
this.payloadUrl = payloadUrl;
}
public String getMessage() {

View File

@ -19,7 +19,6 @@ package org.apache.activemq.console.command;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ConsumerThread;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,7 +36,11 @@ public class ConsumerCommand extends AbstractCommand {
String destination = "queue://TEST";
int messageCount = 1000;
int sleep;
int transactionBatchSize;
boolean transacted;
private boolean durable;
private String clientId;
int batchSize = 10;
int ackMode = Session.AUTO_ACKNOWLEDGE;
int parallelThreads = 1;
boolean bytesAsText;
@ -52,13 +55,16 @@ public class ConsumerCommand extends AbstractCommand {
Connection conn = null;
try {
conn = factory.createConnection(user, password);
if (durable && clientId != null && clientId.length() > 0 && !"null".equals(clientId)) {
conn.setClientID(clientId);
}
conn.start();
Session sess;
if (transactionBatchSize != 0) {
if (transacted) {
sess = conn.createSession(true, Session.SESSION_TRANSACTED);
} else {
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
sess = conn.createSession(false, ackMode);
}
@ -67,10 +73,11 @@ public class ConsumerCommand extends AbstractCommand {
for (int i = 1; i <= parallelThreads; i++) {
ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
consumer.setName("consumer-" + i);
consumer.setDurable(durable);
consumer.setBreakOnNull(false);
consumer.setMessageCount(messageCount);
consumer.setSleep(sleep);
consumer.setTransactionBatchSize(transactionBatchSize);
consumer.setBatchSize(batchSize);
consumer.setFinished(active);
consumer.setBytesAsText(bytesAsText);
consumer.start();
@ -132,12 +139,12 @@ public class ConsumerCommand extends AbstractCommand {
this.sleep = sleep;
}
public int getTransactionBatchSize() {
return transactionBatchSize;
public int getBatchSize() {
return batchSize;
}
public void setTransactionBatchSize(int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public int getParallelThreads() {
@ -156,6 +163,46 @@ public class ConsumerCommand extends AbstractCommand {
this.bytesAsText = bytesAsText;
}
public boolean isTransacted() {
return transacted;
}
public void setTransacted(boolean transacted) {
this.transacted = transacted;
}
public int getAckMode() {
return ackMode;
}
public void setAckMode(String ackMode) {
if ("CLIENT_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.CLIENT_ACKNOWLEDGE;
}
if ("AUTO_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.AUTO_ACKNOWLEDGE;
}
if ("DUPS_OK_ACKNOWLEDGE".equals(ackMode)) {
this.ackMode = Session.DUPS_OK_ACKNOWLEDGE;
}
}
public boolean isDurable() {
return durable;
}
public void setDurable(boolean durable) {
this.durable = durable;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
@Override
protected void printHelp() {
printHelpFromFile();

View File

@ -38,7 +38,7 @@ public class ProducerCommand extends AbstractCommand {
int sleep = 0;
boolean persistent = true;
String message = null;
String url = null;
String payloadUrl = null;
int messageSize = 0;
int textMessageSize;
long msgTTL = 0L;
@ -78,7 +78,7 @@ public class ProducerCommand extends AbstractCommand {
producer.setPersistent(persistent);
producer.setTransactionBatchSize(transactionBatchSize);
producer.setMessage(message);
producer.setUrl(url);
producer.setPayloadUrl(payloadUrl);
producer.setMessageSize(messageSize);
producer.setMsgGroupID(msgGroupID);
producer.setTextMessageSize(textMessageSize);
@ -198,12 +198,12 @@ public class ProducerCommand extends AbstractCommand {
this.parallelThreads = parallelThreads;
}
public String getUrl() {
return url;
public String getPayloadUrl() {
return payloadUrl;
}
public void setUrl(String url) {
this.url = url;
public void setPayloadUrl(String payloadUrl) {
this.payloadUrl = payloadUrl;
}
public String getMessage() {

View File

@ -7,6 +7,9 @@ Options :
[--destination queue://..|topic://..] - consumer destination; default queue://TEST
[--messageCount N] - number of messages to send; default 1000
[--sleep N] - millisecond sleep period between sends or receives; default 0
[--transactionBatchSize N] - use send transaction batches of size N; default 0, no jms transactions
[--ackMode AUTO_ACKNOWLEDGE|CLIENT_ACKNOWLEDGE] - the type of message acknowledgement to use; default auto acknowledge
[--batchSize N] - batch size for transactions and client acknowledgment (default 10)
[--durable true|false] - create durable topic
[--clientId ..] - connection client id; must be set for durable topics
[--parallelThreads N] - number of threads to run in parallel; default 1
[--bytesAsText true|false] - try to treat a BytesMessage as a text string

View File

@ -14,5 +14,5 @@ Options :
[--messageSize N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used
[--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
[--message ..] - a text string to use as the message body
[--url URL] - a url pointing to a document to use as the message body
[--payloadUrl URL] - a url pointing to a document to use as the message body
[--msgGroupID ..] - JMS message group identifier