https://issues.apache.org/jira/browse/AMQ-5558 - some more options for producer/consumer tools

This commit is contained in:
Dejan Bosanac 2015-03-23 14:46:14 +01:00
parent 85d9d4e941
commit df3ff9c65e
6 changed files with 109 additions and 28 deletions

View File

@ -38,6 +38,7 @@ public class ConsumerThread extends Thread {
int transactions = 0;
boolean running = false;
CountDownLatch finished;
boolean bytesAsText;
public ConsumerThread(Session session, Destination destination) {
this.destination = destination;
@ -56,6 +57,12 @@ public class ConsumerThread extends Thread {
Message msg = consumer.receive(receiveTimeOut);
if (msg != null) {
LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
LOG.info("BytesMessage as text string: " + new String(bytes));
}
received++;
} else {
if (breakOnNull) {
@ -151,4 +158,12 @@ public class ConsumerThread extends Thread {
public void setFinished(CountDownLatch finished) {
this.finished = finished;
}
public boolean isBytesAsText() {
return bytesAsText;
}
public void setBytesAsText(boolean bytesAsText) {
this.bytesAsText = bytesAsText;
}
}

View File

@ -20,10 +20,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.File;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.*;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
@ -44,6 +41,9 @@ public class ProducerThread extends Thread {
int transactions = 0;
int sentCount = 0;
String message;
String messageText = null;
String url = null;
byte[] payload = null;
boolean running = false;
CountDownLatch finished;
@ -114,35 +114,55 @@ public class ProducerThread extends Thread {
}
protected Message createMessage(int i) throws Exception {
Message message = null;
Message answer;
if (payload != null) {
message = session.createBytesMessage();
((BytesMessage)message).writeBytes(payload);
answer = session.createBytesMessage();
((BytesMessage) answer).writeBytes(payload);
} else {
if (textMessageSize > 0) {
InputStreamReader reader = null;
if (messageText == null) {
messageText = readInputStream(getClass().getResourceAsStream("demo.txt"), textMessageSize, i);
}
} else if (url != null) {
messageText = readInputStream(new URL(url).openStream(), -1, i);
} else if (message != null) {
messageText = message;
} else {
messageText = createDefaultMessage(i);
}
answer = session.createTextMessage(messageText);
}
if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
answer.setStringProperty("JMSXGroupID", msgGroupID);
}
return answer;
}
private String readInputStream(InputStream is, int size, int messageNumber) throws IOException {
InputStreamReader reader = new InputStreamReader(is);
try {
InputStream is = getClass().getResourceAsStream("demo.txt");
reader = new InputStreamReader(is);
char[] chars = new char[textMessageSize];
reader.read(chars);
message = session.createTextMessage(String.valueOf(chars));
} catch (Exception e) {
LOG.warn(Thread.currentThread().getName() + " Failed to load " + textMessageSize + " bytes of demo text. Using default text message instead");
message = session.createTextMessage("test message: " + i);
char[] buffer;
if (size > 0) {
buffer = new char[size];
} else {
buffer = new char[1024];
}
int count;
StringBuilder builder = new StringBuilder();
while ((count = reader.read(buffer)) != -1) {
builder.append(buffer, 0, count);
if (size > 0) break;
}
return builder.toString();
} catch (IOException ioe) {
return createDefaultMessage(messageNumber);
} finally {
if (reader != null) {
reader.close();
}
}
} else {
message = session.createTextMessage("test message: " + i);
}
}
if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
message.setStringProperty("JMSXGroupID", msgGroupID);
}
return message;
private String createDefaultMessage(int messageNumber) {
return "test message: " + messageNumber;
}
public void setMessageCount(int messageCount) {
@ -228,4 +248,20 @@ public class ProducerThread extends Thread {
public void setFinished(CountDownLatch finished) {
this.finished = finished;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}

View File

@ -39,6 +39,7 @@ public class ConsumerCommand extends AbstractCommand {
int sleep;
int transactionBatchSize;
int parallelThreads = 1;
boolean bytesAsText;
@Override
protected void runTask(List<String> tokens) throws Exception {
@ -71,6 +72,7 @@ public class ConsumerCommand extends AbstractCommand {
consumer.setSleep(sleep);
consumer.setTransactionBatchSize(transactionBatchSize);
consumer.setFinished(active);
consumer.setBytesAsText(bytesAsText);
consumer.start();
}
@ -146,6 +148,14 @@ public class ConsumerCommand extends AbstractCommand {
this.parallelThreads = parallelThreads;
}
public boolean isBytesAsText() {
return bytesAsText;
}
public void setBytesAsText(boolean bytesAsText) {
this.bytesAsText = bytesAsText;
}
@Override
protected void printHelp() {
printHelpFromFile();
@ -160,5 +170,4 @@ public class ConsumerCommand extends AbstractCommand {
public String getOneLineDescription() {
return "Receives messages from the broker";
}
}

View File

@ -18,14 +18,12 @@ package org.apache.activemq.console.command;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ProducerThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Session;
import java.io.*;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -39,6 +37,8 @@ public class ProducerCommand extends AbstractCommand {
int messageCount = 1000;
int sleep = 0;
boolean persistent = true;
String message = null;
String url = null;
int messageSize = 0;
int textMessageSize;
long msgTTL = 0L;
@ -77,6 +77,8 @@ public class ProducerCommand extends AbstractCommand {
producer.setMsgTTL(msgTTL);
producer.setPersistent(persistent);
producer.setTransactionBatchSize(transactionBatchSize);
producer.setMessage(message);
producer.setUrl(url);
producer.setMessageSize(messageSize);
producer.setMsgGroupID(msgGroupID);
producer.setTextMessageSize(textMessageSize);
@ -196,6 +198,22 @@ public class ProducerCommand extends AbstractCommand {
this.parallelThreads = parallelThreads;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
protected void printHelp() {
printHelpFromFile();

View File

@ -9,3 +9,4 @@ Options :
[--sleep N] - millisecond sleep period between sends or receives; default 0
[--transactionBatchSize N] - use send transaction batches of size N; default 0, no jms transactions
[--parallelThreads N] - number of threads to run in parallel; default 1
[--bytesAsText true|false] - try to treat a BytesMessage as a text string

View File

@ -13,4 +13,6 @@ Options :
[--msgTTL N] - message TTL in milliseconds
[--messageSize N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used
[--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
[--message ..] - a text string to use as the message body
[--url URL] - a url pointing to a document to use as the message body
[--msgGroupID ..] - JMS message group identifier