From 6dd47bb63fb37a4d1ff491188e75f613b6b81935 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Mon, 4 Aug 2014 20:33:16 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5306 This closes #39 --- .../activemq/tool/AbstractJmsClient.java | 123 +++++++++--- .../activemq/tool/JmsConsumerClient.java | 18 +- .../activemq/tool/JmsProducerClient.java | 4 +- .../activemq/tool/AbstractJmsClientTest.java | 175 ++++++++++++++++++ 4 files changed, 284 insertions(+), 36 deletions(-) create mode 100644 activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java index 07e7c2fd0d..d2e38ab88c 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java +++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/AbstractJmsClient.java @@ -16,12 +16,16 @@ */ package org.apache.activemq.tool; +import java.util.ArrayList; +import java.util.List; + import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.tool.properties.JmsClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +34,10 @@ public abstract class AbstractJmsClient { private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClient.class); + private static final String QUEUE_SCHEME = "queue://"; + private static final String TOPIC_SCHEME = "topic://"; + public static final String DESTINATION_SEPARATOR = ","; + protected ConnectionFactory factory; protected Connection jmsConnection; protected Session jmsSession; @@ -108,18 +116,52 @@ public abstract class AbstractJmsClient { return jmsSession; } - public Destination[] createDestination(int destIndex, int destCount) throws JMSException { + public Destination[] createDestinations(int destCount) throws JMSException { + final String destName = getClient().getDestName(); + ArrayList destinations = new ArrayList<>(); + if (destName.contains(DESTINATION_SEPARATOR)) { + if (getClient().isDestComposite() && (destCount == 1)) { + // user was explicit about which destinations to make composite + String[] simpleNames = mapToSimpleNames(destName.split(DESTINATION_SEPARATOR)); + String joinedSimpleNames = join(simpleNames, DESTINATION_SEPARATOR); - if (getClient().isDestComposite()) { - return new Destination[] { - createCompositeDestination(getClient().getDestName(), destIndex, destCount) - }; - } else { - Destination[] dest = new Destination[destCount]; - for (int i = 0; i < destCount; i++) { - dest[i] = createDestination(withDestinationSuffix(getClient().getDestName(), i, destCount)); + // use the type of the 1st destination for the Destination instance + byte destinationType = getDestinationType(destName); + destinations.add(createCompositeDestination(destinationType, joinedSimpleNames, 1)); + } else { + LOG.info("User requested multiple destinations, splitting: {}", destName); + // either composite with multiple destinations to be suffixed + // or multiple non-composite destinations + String[] destinationNames = destName.split(DESTINATION_SEPARATOR); + for (String splitDestName : destinationNames) { + addDestinations(destinations, splitDestName, destCount); + } + } + } else { + addDestinations(destinations, destName, destCount); + } + return destinations.toArray(new Destination[] {}); + } + + private String join(String[] stings, String separator) { + StringBuffer sb = new StringBuffer(); + for (int i = 0; i < stings.length; i++) { + if (i > 0) { + sb.append(separator); + } + sb.append(stings[i]); + } + return sb.toString(); + } + + private void addDestinations(List destinations, String destName, int destCount) throws JMSException { + boolean destComposite = getClient().isDestComposite(); + if ((destComposite) && (destCount > 1)) { + destinations.add(createCompositeDestination(destName, destCount)); + } else { + for (int i = 0; i < destCount; i++) { + destinations.add(createDestination(withDestinationSuffix(destName, i, destCount))); } - return dest; } } @@ -127,20 +169,12 @@ public abstract class AbstractJmsClient { return (destCount == 1) ? name : name + "." + destIndex; } - public Destination createCompositeDestination(int destIndex, int destCount) throws JMSException { - return createCompositeDestination(getClient().getDestName(), destIndex, destCount); + protected Destination createCompositeDestination(String destName, int destCount) throws JMSException { + return createCompositeDestination(getDestinationType(destName), destName, destCount); } - protected Destination createCompositeDestination(String name, int destIndex, int destCount) throws JMSException { - String simpleName; - - if (name.startsWith("queue://")) { - simpleName = name.substring("queue://".length()); - } else if (name.startsWith("topic://")) { - simpleName = name.substring("topic://".length()); - } else { - simpleName = name; - } + protected Destination createCompositeDestination(byte destinationType, String destName, int destCount) throws JMSException { + String simpleName = getSimpleName(destName); String compDestName = ""; for (int i = 0; i < destCount; i++) { @@ -150,16 +184,47 @@ public abstract class AbstractJmsClient { compDestName += withDestinationSuffix(simpleName, i, destCount); } - return createDestination(compDestName); + LOG.info("Creating composite destination: {}", compDestName); + return (destinationType == ActiveMQDestination.TOPIC_TYPE) ? + getSession().createTopic(compDestName) : getSession().createQueue(compDestName); } - protected Destination createDestination(String name) throws JMSException { - if (name.startsWith("queue://")) { - return getSession().createQueue(name.substring("queue://".length())); - } else if (name.startsWith("topic://")) { - return getSession().createTopic(name.substring("topic://".length())); + private String[] mapToSimpleNames(String[] destNames) { + assert (destNames != null); + String[] simpleNames = new String[destNames.length]; + for (int i = 0; i < destNames.length; i++) { + simpleNames[i] = getSimpleName(destNames[i]); + } + return simpleNames; + } + + private String getSimpleName(String destName) { + String simpleName; + if (destName.startsWith(QUEUE_SCHEME)) { + simpleName = destName.substring(QUEUE_SCHEME.length()); + } else if (destName.startsWith(TOPIC_SCHEME)) { + simpleName = destName.substring(TOPIC_SCHEME.length()); } else { - return getSession().createTopic(name); + simpleName = destName; + } + return simpleName; + } + + private byte getDestinationType(String destName) { + assert (destName != null); + if (destName.startsWith(QUEUE_SCHEME)) { + return ActiveMQDestination.QUEUE_TYPE; + } else { + return ActiveMQDestination.TOPIC_TYPE; + } + } + + protected Destination createDestination(String destName) throws JMSException { + String simpleName = getSimpleName(destName); + if (getDestinationType(destName) == ActiveMQDestination.QUEUE_TYPE) { + return getSession().createQueue(simpleName); + } else { + return getSession().createTopic(simpleName); } } diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java index 7351d0294a..d6608832b1 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java +++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsConsumerClient.java @@ -26,6 +26,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Topic; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.tool.properties.JmsClientProperties; import org.apache.activemq.tool.properties.JmsConsumerProperties; import org.slf4j.Logger; @@ -208,12 +209,19 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient { } public MessageConsumer createJmsConsumer() throws JMSException { - Destination[] dest = createDestination(destIndex, destCount); + Destination[] dest = createDestinations(destCount); - if (this.client.getMessageSelector() == null) - return createJmsConsumer(dest[0]); - else - return createJmsConsumer(dest[0], this.client.getMessageSelector(), false); + Destination consumedDestination = dest[0]; + if (dest.length > 1) { + String destinationName = ((ActiveMQDestination) consumedDestination).getPhysicalName(); + LOG.warn("Multiple destinations requested for consumer; using only first: {}", destinationName); + } + + if (this.client.getMessageSelector() == null) { + return createJmsConsumer(consumedDestination); + } else { + return createJmsConsumer(consumedDestination, this.client.getMessageSelector(), false); + } } public MessageConsumer createJmsConsumer(Destination dest) throws JMSException { diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java index e59a857bf6..510d92d697 100644 --- a/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java +++ b/activemq-tooling/activemq-perf-maven-plugin/src/main/java/org/apache/activemq/tool/JmsProducerClient.java @@ -77,7 +77,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { public void sendCountBasedMessages(long messageCount) throws JMSException { // Parse through different ways to send messages // Avoided putting the condition inside the loop to prevent effect on performance - Destination[] dest = createDestination(destIndex, destCount); + Destination[] dest = createDestinations(destCount); // Create a producer, if none is created. if (getJmsProducer() == null) { @@ -165,7 +165,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient { // Parse through different ways to send messages // Avoided putting the condition inside the loop to prevent effect on performance - Destination[] dest = createDestination(destIndex, destCount); + Destination[] dest = createDestinations(destCount); // Create a producer, if none is created. if (getJmsProducer() == null) { diff --git a/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java new file mode 100644 index 0000000000..5e8f7927b4 --- /dev/null +++ b/activemq-tooling/activemq-perf-maven-plugin/src/test/java/org/apache/activemq/tool/AbstractJmsClientTest.java @@ -0,0 +1,175 @@ +package org.apache.activemq.tool; + +import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE; +import static org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE; +import static org.junit.Assert.assertEquals; + +import java.net.URI; + +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.tool.properties.JmsClientProperties; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class AbstractJmsClientTest { + + public class NullJmsClient extends AbstractJmsClient { + private JmsClientProperties client; + + public NullJmsClient(ConnectionFactory factory) { + super(factory); + } + + @Override + public JmsClientProperties getClient() { + return client; + } + + @Override + public void setClient(JmsClientProperties client) { + this.client = client; + } + } + + private final String DEFAULT_DEST = "TEST.FOO"; + private static BrokerService brokerService; + private static ActiveMQConnectionFactory connectionFactory; + + private AbstractJmsClient jmsClient; + private JmsClientProperties clientProperties; + + @BeforeClass + public static void setUpBrokerAndConnectionFactory() throws Exception { + brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false")); + brokerService.start(); + connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); + } + + @AfterClass + public static void tearDownBroker() throws Exception { + brokerService.stop(); + } + + @Before + public void setUp() { + jmsClient = new NullJmsClient(connectionFactory); + clientProperties = new JmsClientProperties(); + clientProperties.setDestName(DEFAULT_DEST); + jmsClient.setClient(clientProperties); + } + + @Test + public void testCreateDestination() throws JMSException { + assertDestinationNameType("dest", TOPIC_TYPE, + asAmqDest(jmsClient.createDestination("dest"))); + } + + @Test + public void testCreateDestination_topic() throws JMSException { + assertDestinationNameType("dest", TOPIC_TYPE, + asAmqDest(jmsClient.createDestination("topic://dest"))); + } + + @Test + public void testCreateDestination_queue() throws JMSException { + assertDestinationNameType("dest", QUEUE_TYPE, + asAmqDest(jmsClient.createDestination("queue://dest"))); + } + + @Test + public void testCreateDestinations_commaSeparated() throws JMSException { + clientProperties.setDestName("queue://foo,topic://cheese"); + Destination[] destinations = jmsClient.createDestinations(1); + assertEquals(2, destinations.length); + assertDestinationNameType("foo", QUEUE_TYPE, asAmqDest(destinations[0])); + assertDestinationNameType("cheese", TOPIC_TYPE, asAmqDest(destinations[1])); + } + + @Test + public void testCreateDestinations_multipleComposite() throws JMSException { + clientProperties.setDestComposite(true); + clientProperties.setDestName("queue://foo,queue://cheese"); + Destination[] destinations = jmsClient.createDestinations(1); + assertEquals(1, destinations.length); + // suffixes should be added + assertDestinationNameType("foo,cheese", QUEUE_TYPE, asAmqDest(destinations[0])); + } + + @Test + public void testCreateDestinations() throws JMSException { + Destination[] destinations = jmsClient.createDestinations(1); + assertEquals(1, destinations.length); + assertDestinationNameType(DEFAULT_DEST, TOPIC_TYPE, asAmqDest(destinations[0])); + } + + @Test + public void testCreateDestinations_multiple() throws JMSException { + Destination[] destinations = jmsClient.createDestinations(2); + assertEquals(2, destinations.length); + // suffixes should be added + assertDestinationNameType(DEFAULT_DEST + ".0", TOPIC_TYPE, asAmqDest(destinations[0])); + assertDestinationNameType(DEFAULT_DEST + ".1", TOPIC_TYPE, asAmqDest(destinations[1])); + } + + @Test + public void testCreateDestinations_multipleCommaSeparated() throws JMSException { + clientProperties.setDestName("queue://foo,topic://cheese"); + Destination[] destinations = jmsClient.createDestinations(2); + assertEquals(4, destinations.length); + // suffixes should be added + assertDestinationNameType("foo.0", QUEUE_TYPE, asAmqDest(destinations[0])); + assertDestinationNameType("foo.1", QUEUE_TYPE, asAmqDest(destinations[1])); + assertDestinationNameType("cheese.0", TOPIC_TYPE, asAmqDest(destinations[2])); + assertDestinationNameType("cheese.1", TOPIC_TYPE, asAmqDest(destinations[3])); + } + + @Test + public void testCreateDestinations_composite() throws JMSException { + clientProperties.setDestComposite(true); + Destination[] destinations = jmsClient.createDestinations(2); + assertEquals(1, destinations.length); + // suffixes should be added + String expectedDestName = DEFAULT_DEST + ".0," + DEFAULT_DEST + ".1"; + assertDestinationNameType(expectedDestName, TOPIC_TYPE, asAmqDest(destinations[0])); + } + + @Test + public void testCreateDestinations_compositeQueue() throws JMSException { + clientProperties.setDestComposite(true); + clientProperties.setDestName("queue://" + DEFAULT_DEST); + Destination[] destinations = jmsClient.createDestinations(2); + assertEquals(1, destinations.length); + // suffixes should be added + String expectedDestName = DEFAULT_DEST + ".0," + DEFAULT_DEST + ".1"; + assertDestinationNameType(expectedDestName, QUEUE_TYPE, asAmqDest(destinations[0])); + } + + @Test + public void testCreateDestinations_compositeCommaSeparated() throws JMSException { + clientProperties.setDestComposite(true); + clientProperties.setDestName("queue://foo,topic://cheese"); + Destination[] destinations = jmsClient.createDestinations(2); + assertEquals(2, destinations.length); + + assertDestinationNameType("foo.0,foo.1", QUEUE_TYPE, asAmqDest(destinations[0])); + assertDestinationNameType("cheese.0,cheese.1", TOPIC_TYPE, asAmqDest(destinations[1])); + } + + private void assertDestinationNameType(String physicalName, byte destinationType, ActiveMQDestination destination) { + assertEquals(destinationType, destination.getDestinationType()); + assertEquals(physicalName, destination.getPhysicalName()); + } + + private ActiveMQDestination asAmqDest(Destination destination) { + return (ActiveMQDestination) destination; + } +}