This commit is contained in:
Dejan Bosanac 2015-02-05 13:58:22 +01:00
parent b0a1bd833c
commit 9f0ab46e29
14 changed files with 828 additions and 167 deletions

View File

@ -0,0 +1,154 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.util.concurrent.CountDownLatch;
public class ConsumerThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
int messageCount = 1000;
int receiveTimeOut = 3000;
Destination destination;
Session session;
boolean breakOnNull = true;
int sleep;
int transactionBatchSize;
int received = 0;
int transactions = 0;
boolean running = false;
CountDownLatch finished;
public ConsumerThread(Session session, Destination destination) {
this.destination = destination;
this.session = session;
}
@Override
public void run() {
running = true;
MessageConsumer consumer = null;
String threadName = Thread.currentThread().getName();
LOG.info(threadName + " wait until " + messageCount + " messages are consumed");
try {
consumer = session.createConsumer(destination);
while (running && received < messageCount) {
Message msg = consumer.receive(receiveTimeOut);
if (msg != null) {
LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
received++;
} else {
if (breakOnNull) {
break;
}
}
if (transactionBatchSize > 0 && received > 0 && received % transactionBatchSize == 0) {
LOG.info(threadName + " Committing transaction: " + transactions++);
session.commit();
}
if (sleep > 0) {
Thread.sleep(sleep);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (finished != null) {
finished.countDown();
}
if (consumer != null) {
LOG.info(threadName + " Consumed: " + this.getReceived() + " messages");
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
LOG.info(threadName + " Consumer thread finished");
}
public int getReceived() {
return received;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public void setBreakOnNull(boolean breakOnNull) {
this.breakOnNull = breakOnNull;
}
public int getTransactionBatchSize() {
return transactionBatchSize;
}
public void setTransactionBatchSize(int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
}
public int getMessageCount() {
return messageCount;
}
public boolean isBreakOnNull() {
return breakOnNull;
}
public int getReceiveTimeOut() {
return receiveTimeOut;
}
public void setReceiveTimeOut(int receiveTimeOut) {
this.receiveTimeOut = receiveTimeOut;
}
public boolean isRunning() {
return running;
}
public void setRunning(boolean running) {
this.running = running;
}
public int getSleep() {
return sleep;
}
public void setSleep(int sleep) {
this.sleep = sleep;
}
public CountDownLatch getFinished() {
return finished;
}
public void setFinished(CountDownLatch finished) {
this.finished = finished;
}
}

View File

@ -0,0 +1,231 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.File;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.concurrent.CountDownLatch;
public class ProducerThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
int messageCount = 1000;
Destination destination;
protected Session session;
int sleep = 0;
boolean persistent = true;
int messageSize = 0;
int textMessageSize;
long msgTTL = 0L;
String msgGroupID=null;
int transactionBatchSize;
int transactions = 0;
int sentCount = 0;
byte[] payload = null;
boolean running = false;
CountDownLatch finished;
public ProducerThread(Session session, Destination destination) {
this.destination = destination;
this.session = session;
}
public void run() {
MessageProducer producer = null;
String threadName = Thread.currentThread().getName();
try {
producer = session.createProducer(destination);
producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(msgTTL);
initPayLoad();
running = true;
LOG.info(threadName + " Started to calculate elapsed time ...\n");
long tStart = System.currentTimeMillis();
for (sentCount = 0; sentCount < messageCount && running; sentCount++) {
Message message = createMessage(sentCount);
producer.send(message);
LOG.info(threadName + " Sent: " + (message instanceof TextMessage ? ((TextMessage) message).getText() : message.getJMSMessageID()));
if (transactionBatchSize > 0 && sentCount > 0 && sentCount % transactionBatchSize == 0) {
LOG.info(threadName + " Committing transaction: " + transactions++);
session.commit();
}
if (sleep > 0) {
Thread.sleep(sleep);
}
}
LOG.info(threadName + " Produced: " + this.getSentCount() + " messages");
long tEnd = System.currentTimeMillis();
long elapsed = (tEnd - tStart) / 1000;
LOG.info(threadName + " Elapsed time in second : " + elapsed + " s");
LOG.info(threadName + " Elapsed time in milli second : " + (tEnd - tStart) + " milli seconds");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (finished != null) {
finished.countDown();
}
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
private void initPayLoad() {
if (messageSize > 0) {
payload = new byte[messageSize];
for (int i = 0; i < payload.length; i++) {
payload[i] = '.';
}
}
}
protected Message createMessage(int i) throws Exception {
Message message = null;
if (payload != null) {
message = session.createBytesMessage();
((BytesMessage)message).writeBytes(payload);
} else {
if (textMessageSize > 0) {
InputStreamReader reader = null;
try {
InputStream is = getClass().getResourceAsStream("demo.txt");
reader = new InputStreamReader(is);
char[] chars = new char[textMessageSize];
reader.read(chars);
message = session.createTextMessage(String.valueOf(chars));
} catch (Exception e) {
LOG.warn(Thread.currentThread().getName() + " Failed to load " + textMessageSize + " bytes of demo text. Using default text message instead");
message = session.createTextMessage("test message: " + i);
} finally {
if (reader != null) {
reader.close();
}
}
} else {
message = session.createTextMessage("test message: " + i);
}
}
if ((msgGroupID != null) && (!msgGroupID.isEmpty())) {
message.setStringProperty("JMSXGroupID", msgGroupID);
}
return message;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public int getSleep() {
return sleep;
}
public void setSleep(int sleep) {
this.sleep = sleep;
}
public int getMessageCount() {
return messageCount;
}
public int getSentCount() {
return sentCount;
}
public boolean isPersistent() {
return persistent;
}
public void setPersistent(boolean persistent) {
this.persistent = persistent;
}
public boolean isRunning() {
return running;
}
public void setRunning(boolean running) {
this.running = running;
}
public long getMsgTTL() {
return msgTTL;
}
public void setMsgTTL(long msgTTL) {
this.msgTTL = msgTTL;
}
public int getTransactionBatchSize() {
return transactionBatchSize;
}
public void setTransactionBatchSize(int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
}
public String getMsgGroupID() {
return msgGroupID;
}
public void setMsgGroupID(String msgGroupID) {
this.msgGroupID = msgGroupID;
}
public int getTextMessageSize() {
return textMessageSize;
}
public void setTextMessageSize(int textMessageSize) {
this.textMessageSize = textMessageSize;
}
public int getMessageSize() {
return messageSize;
}
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public CountDownLatch getFinished() {
return finished;
}
public void setFinished(CountDownLatch finished) {
this.finished = finished;
}
}

View File

@ -0,0 +1,15 @@
Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis.
At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, At accusam aliquyam diam diam dolore dolores duo eirmod eos erat, et nonumy sed tempor et et invidunt justo labore Stet clita ea et gubergren, kasd magna no rebum. sanctus sea sed takimata ut vero voluptua. est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat.
Consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscing elitr, sed diam nonumy eirmod tempor invidunt ut labore et dolore magna aliquyam erat, sed diam voluptua. At vero eos et accusam et justo duo dolores et ea rebum. Stet clita kasd gubergren, no sea takimata sanctus est Lorem ipsum dolor sit amet.
Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.
Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat. Duis autem vel eum iriure dolor in hendrerit in vulputate velit esse molestie consequat, vel illum dolore eu feugiat nulla facilisis at vero eros et accumsan et iusto odio dignissim qui blandit praesent luptatum zzril delenit augue duis dolore te feugait nulla facilisi.
Nam liber tempor cum soluta nobis eleifend option congue nihil imperdiet doming id quod mazim placerat facer possim assum. Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat. Ut wisi enim ad minim veniam, quis nostrud exerci tation ullamcorper suscipit lobortis nisl ut aliquip ex ea commodo consequat.

View File

@ -16,10 +16,15 @@
*/
package org.apache.activemq.console.command;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.console.CommandContext;
import org.apache.activemq.util.IntrospectionSupport;
public abstract class AbstractCommand implements Command {
public static final String COMMAND_OPTION_DELIMETER = ",";
@ -110,6 +115,14 @@ public abstract class AbstractCommand implements Command {
}
System.setProperty(key, value);
} else {
if (token.startsWith("--")) {
String prop = token.substring(2);
if (tokens.isEmpty() || tokens.get(0).startsWith("-")) {
context.print("Property '" + prop + "' is not specified!");
} else if (IntrospectionSupport.setProperty(this, prop, tokens.remove(0))) {
return;
}
}
// Token is unrecognized
context.printInfo("Unrecognized option: " + token);
isPrintHelp = true;
@ -128,4 +141,22 @@ public abstract class AbstractCommand implements Command {
* Print the help messages for the specific task
*/
protected abstract void printHelp();
protected void printHelpFromFile() {
BufferedReader reader = null;
try {
InputStream is = getClass().getResourceAsStream(getName() + ".txt");
reader = new BufferedReader(new InputStreamReader(is));
String line;
while ((line = reader.readLine()) != null) {
context.print(line);
}
} catch (Exception e) {} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {}
}
}
}
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.ConsumerThread;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Session;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ConsumerCommand extends AbstractCommand {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerCommand.class);
String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
String user = ActiveMQConnectionFactory.DEFAULT_USER;
String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
String destination = "queue://TEST";
int messageCount = 1000;
int sleep;
int transactionBatchSize;
int parallelThreads = 1;
@Override
protected void runTask(List<String> tokens) throws Exception {
LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")");
LOG.info("Consuming " + destination);
LOG.info("Sleeping between receives " + sleep + " ms");
LOG.info("Running " + parallelThreads + " parallel threads");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
Connection conn = factory.createConnection(user, password);
conn.start();
Session sess;
if (transactionBatchSize != 0) {
sess = conn.createSession(true, Session.SESSION_TRANSACTED);
} else {
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
CountDownLatch active = new CountDownLatch(parallelThreads);
for (int i = 1; i <= parallelThreads; i++) {
ConsumerThread consumer = new ConsumerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
consumer.setName("consumer-" + i);
consumer.setBreakOnNull(false);
consumer.setMessageCount(messageCount);
consumer.setSleep(sleep);
consumer.setTransactionBatchSize(transactionBatchSize);
consumer.setFinished(active);
consumer.start();
}
active.await();
}
public String getBrokerUrl() {
return brokerUrl;
}
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public int getMessageCount() {
return messageCount;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public int getSleep() {
return sleep;
}
public void setSleep(int sleep) {
this.sleep = sleep;
}
public int getTransactionBatchSize() {
return transactionBatchSize;
}
public void setTransactionBatchSize(int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
}
public int getParallelThreads() {
return parallelThreads;
}
public void setParallelThreads(int parallelThreads) {
this.parallelThreads = parallelThreads;
}
@Override
protected void printHelp() {
printHelpFromFile();
}
@Override
public String getName() {
return "consumer";
}
@Override
public String getOneLineDescription() {
return "Receives messages from the broker";
}
}

View File

@ -0,0 +1,206 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.command;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.ProducerThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Session;
import java.io.*;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ProducerCommand extends AbstractCommand {
private static final Logger LOG = LoggerFactory.getLogger(ProducerCommand.class);
String brokerUrl = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
String user = ActiveMQConnectionFactory.DEFAULT_USER;
String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
String destination = "queue://TEST";
int messageCount = 1000;
int sleep = 0;
boolean persistent = true;
int messageSize = 0;
int textMessageSize;
long msgTTL = 0L;
String msgGroupID=null;
int transactionBatchSize;
private int parallelThreads = 1;
@Override
protected void runTask(List<String> tokens) throws Exception {
LOG.info("Connecting to URL: " + brokerUrl + " (" + user + ":" + password + ")");
LOG.info("Producing messages to " + destination);
LOG.info("Using " + (persistent ? "persistent" : "non-persistent") + " messages");
LOG.info("Sleeping between sends " + sleep + " ms");
LOG.info("Running " + parallelThreads + " parallel threads");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
Connection conn = factory.createConnection(user, password);
conn.start();
Session sess;
if (transactionBatchSize != 0) {
sess = conn.createSession(true, Session.SESSION_TRANSACTED);
} else {
sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
CountDownLatch active = new CountDownLatch(parallelThreads);
for (int i = 1; i <= parallelThreads; i++) {
ProducerThread producer = new ProducerThread(sess, ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE));
producer.setName("producer-" + i);
producer.setMessageCount(messageCount);
producer.setSleep(sleep);
producer.setMsgTTL(msgTTL);
producer.setPersistent(persistent);
producer.setTransactionBatchSize(transactionBatchSize);
producer.setMessageSize(messageSize);
producer.setMsgGroupID(msgGroupID);
producer.setTextMessageSize(textMessageSize);
producer.setFinished(active);
producer.start();
}
active.await();
}
public String getBrokerUrl() {
return brokerUrl;
}
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public String getDestination() {
return destination;
}
public void setDestination(String destination) {
this.destination = destination;
}
public int getMessageCount() {
return messageCount;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public int getSleep() {
return sleep;
}
public void setSleep(int sleep) {
this.sleep = sleep;
}
public boolean isPersistent() {
return persistent;
}
public void setPersistent(boolean persistent) {
this.persistent = persistent;
}
public int getMessageSize() {
return messageSize;
}
public void setMessageSize(int messageSize) {
this.messageSize = messageSize;
}
public int getTextMessageSize() {
return textMessageSize;
}
public void setTextMessageSize(int textMessageSize) {
this.textMessageSize = textMessageSize;
}
public long getMsgTTL() {
return msgTTL;
}
public void setMsgTTL(long msgTTL) {
this.msgTTL = msgTTL;
}
public String getMsgGroupID() {
return msgGroupID;
}
public void setMsgGroupID(String msgGroupID) {
this.msgGroupID = msgGroupID;
}
public int getTransactionBatchSize() {
return transactionBatchSize;
}
public void setTransactionBatchSize(int transactionBatchSize) {
this.transactionBatchSize = transactionBatchSize;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public int getParallelThreads() {
return parallelThreads;
}
public void setParallelThreads(int parallelThreads) {
this.parallelThreads = parallelThreads;
}
@Override
protected void printHelp() {
printHelpFromFile();
}
@Override
public String getName() {
return "producer";
}
@Override
public String getOneLineDescription() {
return "Sends messages to the broker";
}
}

View File

@ -26,3 +26,5 @@ org.apache.activemq.console.command.EncryptCommand
org.apache.activemq.console.command.DecryptCommand
org.apache.activemq.console.command.StoreExportCommand
org.apache.activemq.console.command.PurgeCommand
org.apache.activemq.console.command.ProducerCommand
org.apache.activemq.console.command.ConsumerCommand

View File

@ -0,0 +1,11 @@
Usage: consumer [OPTIONS]
Description: Demo consumer that can be used to receive messages to the broker
Options :
[--brokerUrl URL] - connection factory url; default " + ActiveMQConnectionFactory.DEFAULT_BROKER_URL
[--user ..] - connection user name
[--password ..] - connection password
[--destination queue://..|topic://..] - ; default TEST
[--messageCount N] - number of messages to send; default 1000
[--sleep N] - millisecond sleep period between sends or receives; default 0
[--transactionBatchSize N] - use send transaction batches of size N; default 0, no jms transactions
[--parallelThreads N] - number of threads to run in parallel; default 1

View File

@ -0,0 +1,16 @@
Usage: producer [OPTIONS]
Description: Demo producer that can be used to send messages to the broker
Options :
[--brokerUrl URL] - connection factory url; default " + ActiveMQConnectionFactory.DEFAULT_BROKER_URL
[--user ..] - connection user name
[--password ..] - connection password
[--destination queue://..|topic://..] - ; default TEST
[--persistent true|false] - use persistent or non persistent messages; default true
[--messageCount N] - number of messages to send; default 1000
[--sleep N] - millisecond sleep period between sends or receives; default 0
[--transactionBatchSize N] - use send transaction batches of size N; default 0, no jms transactions
[--parallelThreads N] - number of threads to run in parallel; default 1
[--msgTTL N] - message TTL in milliseconds
[--messageSize N] - size in bytes of a BytesMessage; default 0, a simple TextMessage is used
[--textMessageSize N] - size in bytes of a TextMessage, a Lorem ipsum demo TextMessage is used
[--msgGroupID ..] - JMS message group identifier

View File

@ -123,7 +123,7 @@ public class AMQ3120Test {
ProducerThread producer = new ProducerThread(producerSess, destination) {
@Override
protected Message createMessage(int i) throws Exception {
return sess.createTextMessage(payload + "::" + i);
return session.createTextMessage(payload + "::" + i);
}
};
producer.setSleep(650);

View File

@ -127,7 +127,7 @@ public class AMQ4323Test {
ProducerThread producer = new ProducerThread(producerSess, destination) {
@Override
protected Message createMessage(int i) throws Exception {
return sess.createTextMessage(payload + "::" + i);
return session.createTextMessage(payload + "::" + i);
}
};
producer.setMessageCount(messageCount);

View File

@ -105,7 +105,7 @@ public class MemoryLimitTest extends TestSupport {
final ProducerThread producer = new ProducerThread(sess, queue) {
@Override
protected Message createMessage(int i) throws Exception {
BytesMessage bytesMessage = sess.createBytesMessage();
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(payload);
return bytesMessage;
}
@ -168,7 +168,7 @@ public class MemoryLimitTest extends TestSupport {
final ProducerThread producer = new ProducerThread(sess, sess.createQueue("STORE.1")) {
@Override
protected Message createMessage(int i) throws Exception {
return sess.createTextMessage(payload + "::" + i);
return session.createTextMessage(payload + "::" + i);
}
};
producer.setMessageCount(1000);
@ -176,7 +176,7 @@ public class MemoryLimitTest extends TestSupport {
final ProducerThread producer2 = new ProducerThread(sess, sess.createQueue("STORE.2")) {
@Override
protected Message createMessage(int i) throws Exception {
return sess.createTextMessage(payload + "::" + i);
return session.createTextMessage(payload + "::" + i);
}
};
producer2.setMessageCount(1000);

View File

@ -1,80 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
public class ConsumerThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ConsumerThread.class);
int messageCount = 1000;
int received = 0;
Destination dest;
Session sess;
boolean breakOnNull = true;
public ConsumerThread(Session sess, Destination dest) {
this.dest = dest;
this.sess = sess;
}
@Override
public void run() {
MessageConsumer consumer = null;
try {
consumer = sess.createConsumer(dest);
while (received < messageCount) {
Message msg = consumer.receive(3000);
if (msg != null) {
LOG.info("Received " + received + ": " + ((TextMessage)msg).getText());
received++;
} else {
if (breakOnNull) {
break;
}
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
if (consumer != null) {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
public int getReceived() {
return received;
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public void setBreakOnNull(boolean breakOnNull) {
this.breakOnNull = breakOnNull;
}
}

View File

@ -1,82 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
public class ProducerThread extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(ProducerThread.class);
int messageCount = 1000;
Destination dest;
protected Session sess;
int sleep = 0;
int sentCount = 0;
public ProducerThread(Session sess, Destination dest) {
this.dest = dest;
this.sess = sess;
}
public void run() {
MessageProducer producer = null;
try {
producer = sess.createProducer(dest);
for (sentCount = 0; sentCount < messageCount; sentCount++) {
producer.send(createMessage(sentCount));
LOG.info("Sent 'test message: " + sentCount + "'");
if (sleep > 0) {
Thread.sleep(sleep);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (producer != null) {
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
protected Message createMessage(int i) throws Exception {
return sess.createTextMessage("test message: " + i);
}
public void setMessageCount(int messageCount) {
this.messageCount = messageCount;
}
public void setSleep(int sleep) {
this.sleep = sleep;
}
public int getMessageCount() {
return messageCount;
}
public int getSentCount() {
return sentCount;
}
}