mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3280 - support selectors in perf plugin
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1091434 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
26cf6a7f26
commit
a072126722
|
@ -193,7 +193,11 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
|||
|
||||
public MessageConsumer createJmsConsumer() throws JMSException {
|
||||
Destination[] dest = createDestination(destIndex, destCount);
|
||||
return createJmsConsumer(dest[0]);
|
||||
|
||||
if (this.client.getMessageSelector() == null)
|
||||
return createJmsConsumer(dest[0]);
|
||||
else
|
||||
return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
|
||||
}
|
||||
|
||||
public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.activemq.tool;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
|
@ -248,6 +249,13 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
|||
|
||||
public TextMessage createJmsTextMessage(int size) throws JMSException {
|
||||
jmsTextMessage = getSession().createTextMessage(buildText("", size));
|
||||
|
||||
// support for adding message headers
|
||||
Set<String> headerKeys = this.client.getHeaderKeys();
|
||||
for (String key : headerKeys) {
|
||||
jmsTextMessage.setObjectProperty(key, this.client.getHeaderValue(key));
|
||||
}
|
||||
|
||||
return jmsTextMessage;
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,8 @@ public class JmsConsumerProperties extends JmsClientProperties {
|
|||
protected long recvDuration = 5 * 60 * 1000; // Receive for 5 mins by default
|
||||
protected long recvDelay = 0; // delay in milliseconds for processing received msg
|
||||
protected String recvType = TIME_BASED_RECEIVING;
|
||||
|
||||
protected String messageSelector = null;
|
||||
|
||||
public boolean isDurable() {
|
||||
return durable;
|
||||
}
|
||||
|
@ -84,4 +85,13 @@ public class JmsConsumerProperties extends JmsClientProperties {
|
|||
public long getRecvDelay() {
|
||||
return this.recvDelay;
|
||||
}
|
||||
|
||||
public String getMessageSelector() {
|
||||
return this.messageSelector;
|
||||
}
|
||||
|
||||
public void setMessageSelector(String selector) {
|
||||
if (selector != null )
|
||||
this.messageSelector = new String(selector);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,7 +16,17 @@
|
|||
*/
|
||||
package org.apache.activemq.tool.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class JmsProducerProperties extends JmsClientProperties {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReflectionUtil.class);
|
||||
|
||||
public static final String TIME_BASED_SENDING = "time"; // Produce messages base on a time interval
|
||||
public static final String COUNT_BASED_SENDING = "count"; // Produce a specific count of messages
|
||||
public static final String DELIVERY_MODE_PERSISTENT = "persistent"; // Persistent message delivery
|
||||
|
@ -29,8 +39,15 @@ public class JmsProducerProperties extends JmsClientProperties {
|
|||
protected String sendType = TIME_BASED_SENDING;
|
||||
protected long sendDelay = 0; // delay in milliseconds between each producer send
|
||||
|
||||
protected Map<String,Object> headerMap = null;
|
||||
|
||||
|
||||
// If true, create a different message on each send, otherwise reuse.
|
||||
protected boolean createNewMsg;
|
||||
|
||||
public JmsProducerProperties() {
|
||||
this.headerMap = new HashMap();
|
||||
}
|
||||
|
||||
public String getDeliveryMode() {
|
||||
return deliveryMode;
|
||||
|
@ -87,4 +104,53 @@ public class JmsProducerProperties extends JmsClientProperties {
|
|||
public long getSendDelay() {
|
||||
return this.sendDelay;
|
||||
}
|
||||
|
||||
|
||||
/* Operations for supporting message headers */
|
||||
|
||||
/**
|
||||
* Method for setting a message header.
|
||||
* @param encodedHeader - the header is encoded as a string using this syntax:
|
||||
* encodedHeader = [headerkey '=' headervalue ':' ]*
|
||||
* E.g. an encodedHeader could read "JMSType=car", or
|
||||
* "JMSType=car:MyHeader=MyValue"
|
||||
*
|
||||
* That implies neither the header key nor the value
|
||||
* can contain any of the characters ':' and '='.
|
||||
*/
|
||||
public void setHeader(String encodedHeader) {
|
||||
|
||||
// remove any trailing ':' characters
|
||||
if (encodedHeader.endsWith(":")) {
|
||||
encodedHeader = encodedHeader.substring(0, encodedHeader.length()-1);
|
||||
}
|
||||
|
||||
// split headers
|
||||
String headers[] = encodedHeader.split(":");
|
||||
for (String h : headers) {
|
||||
|
||||
// split into header name and value
|
||||
String tokens[] = h.split("=");
|
||||
|
||||
// sanity check, don't allow empty string for header names
|
||||
if (tokens.length != 2 || tokens[0].equals("") || tokens[1].equals("") ) {
|
||||
LOG.error("Error parsing message headers. Header: \"" + h +
|
||||
"\". This header will be ignored.");
|
||||
} else {
|
||||
this.headerMap.put(tokens[0], tokens[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Set<String> getHeaderKeys() {
|
||||
return this.headerMap.keySet();
|
||||
}
|
||||
|
||||
public Object getHeaderValue(String key) {
|
||||
return this.headerMap.get(key);
|
||||
}
|
||||
|
||||
public void clearHeaders() {
|
||||
this.headerMap.clear();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue