mirror of https://github.com/apache/activemq.git
This closes #39
This commit is contained in:
parent
7c04ead460
commit
6dd47bb63f
|
@ -16,12 +16,16 @@
|
|||
*/
|
||||
package org.apache.activemq.tool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.tool.properties.JmsClientProperties;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -30,6 +34,10 @@ public abstract class AbstractJmsClient {
|
|||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractJmsClient.class);
|
||||
|
||||
private static final String QUEUE_SCHEME = "queue://";
|
||||
private static final String TOPIC_SCHEME = "topic://";
|
||||
public static final String DESTINATION_SEPARATOR = ",";
|
||||
|
||||
protected ConnectionFactory factory;
|
||||
protected Connection jmsConnection;
|
||||
protected Session jmsSession;
|
||||
|
@ -108,18 +116,52 @@ public abstract class AbstractJmsClient {
|
|||
return jmsSession;
|
||||
}
|
||||
|
||||
public Destination[] createDestination(int destIndex, int destCount) throws JMSException {
|
||||
public Destination[] createDestinations(int destCount) throws JMSException {
|
||||
final String destName = getClient().getDestName();
|
||||
ArrayList<Destination> destinations = new ArrayList<>();
|
||||
if (destName.contains(DESTINATION_SEPARATOR)) {
|
||||
if (getClient().isDestComposite() && (destCount == 1)) {
|
||||
// user was explicit about which destinations to make composite
|
||||
String[] simpleNames = mapToSimpleNames(destName.split(DESTINATION_SEPARATOR));
|
||||
String joinedSimpleNames = join(simpleNames, DESTINATION_SEPARATOR);
|
||||
|
||||
if (getClient().isDestComposite()) {
|
||||
return new Destination[] {
|
||||
createCompositeDestination(getClient().getDestName(), destIndex, destCount)
|
||||
};
|
||||
} else {
|
||||
Destination[] dest = new Destination[destCount];
|
||||
for (int i = 0; i < destCount; i++) {
|
||||
dest[i] = createDestination(withDestinationSuffix(getClient().getDestName(), i, destCount));
|
||||
// use the type of the 1st destination for the Destination instance
|
||||
byte destinationType = getDestinationType(destName);
|
||||
destinations.add(createCompositeDestination(destinationType, joinedSimpleNames, 1));
|
||||
} else {
|
||||
LOG.info("User requested multiple destinations, splitting: {}", destName);
|
||||
// either composite with multiple destinations to be suffixed
|
||||
// or multiple non-composite destinations
|
||||
String[] destinationNames = destName.split(DESTINATION_SEPARATOR);
|
||||
for (String splitDestName : destinationNames) {
|
||||
addDestinations(destinations, splitDestName, destCount);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
addDestinations(destinations, destName, destCount);
|
||||
}
|
||||
return destinations.toArray(new Destination[] {});
|
||||
}
|
||||
|
||||
private String join(String[] stings, String separator) {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
for (int i = 0; i < stings.length; i++) {
|
||||
if (i > 0) {
|
||||
sb.append(separator);
|
||||
}
|
||||
sb.append(stings[i]);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private void addDestinations(List<Destination> destinations, String destName, int destCount) throws JMSException {
|
||||
boolean destComposite = getClient().isDestComposite();
|
||||
if ((destComposite) && (destCount > 1)) {
|
||||
destinations.add(createCompositeDestination(destName, destCount));
|
||||
} else {
|
||||
for (int i = 0; i < destCount; i++) {
|
||||
destinations.add(createDestination(withDestinationSuffix(destName, i, destCount)));
|
||||
}
|
||||
return dest;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,20 +169,12 @@ public abstract class AbstractJmsClient {
|
|||
return (destCount == 1) ? name : name + "." + destIndex;
|
||||
}
|
||||
|
||||
public Destination createCompositeDestination(int destIndex, int destCount) throws JMSException {
|
||||
return createCompositeDestination(getClient().getDestName(), destIndex, destCount);
|
||||
protected Destination createCompositeDestination(String destName, int destCount) throws JMSException {
|
||||
return createCompositeDestination(getDestinationType(destName), destName, destCount);
|
||||
}
|
||||
|
||||
protected Destination createCompositeDestination(String name, int destIndex, int destCount) throws JMSException {
|
||||
String simpleName;
|
||||
|
||||
if (name.startsWith("queue://")) {
|
||||
simpleName = name.substring("queue://".length());
|
||||
} else if (name.startsWith("topic://")) {
|
||||
simpleName = name.substring("topic://".length());
|
||||
} else {
|
||||
simpleName = name;
|
||||
}
|
||||
protected Destination createCompositeDestination(byte destinationType, String destName, int destCount) throws JMSException {
|
||||
String simpleName = getSimpleName(destName);
|
||||
|
||||
String compDestName = "";
|
||||
for (int i = 0; i < destCount; i++) {
|
||||
|
@ -150,16 +184,47 @@ public abstract class AbstractJmsClient {
|
|||
compDestName += withDestinationSuffix(simpleName, i, destCount);
|
||||
}
|
||||
|
||||
return createDestination(compDestName);
|
||||
LOG.info("Creating composite destination: {}", compDestName);
|
||||
return (destinationType == ActiveMQDestination.TOPIC_TYPE) ?
|
||||
getSession().createTopic(compDestName) : getSession().createQueue(compDestName);
|
||||
}
|
||||
|
||||
protected Destination createDestination(String name) throws JMSException {
|
||||
if (name.startsWith("queue://")) {
|
||||
return getSession().createQueue(name.substring("queue://".length()));
|
||||
} else if (name.startsWith("topic://")) {
|
||||
return getSession().createTopic(name.substring("topic://".length()));
|
||||
private String[] mapToSimpleNames(String[] destNames) {
|
||||
assert (destNames != null);
|
||||
String[] simpleNames = new String[destNames.length];
|
||||
for (int i = 0; i < destNames.length; i++) {
|
||||
simpleNames[i] = getSimpleName(destNames[i]);
|
||||
}
|
||||
return simpleNames;
|
||||
}
|
||||
|
||||
private String getSimpleName(String destName) {
|
||||
String simpleName;
|
||||
if (destName.startsWith(QUEUE_SCHEME)) {
|
||||
simpleName = destName.substring(QUEUE_SCHEME.length());
|
||||
} else if (destName.startsWith(TOPIC_SCHEME)) {
|
||||
simpleName = destName.substring(TOPIC_SCHEME.length());
|
||||
} else {
|
||||
return getSession().createTopic(name);
|
||||
simpleName = destName;
|
||||
}
|
||||
return simpleName;
|
||||
}
|
||||
|
||||
private byte getDestinationType(String destName) {
|
||||
assert (destName != null);
|
||||
if (destName.startsWith(QUEUE_SCHEME)) {
|
||||
return ActiveMQDestination.QUEUE_TYPE;
|
||||
} else {
|
||||
return ActiveMQDestination.TOPIC_TYPE;
|
||||
}
|
||||
}
|
||||
|
||||
protected Destination createDestination(String destName) throws JMSException {
|
||||
String simpleName = getSimpleName(destName);
|
||||
if (getDestinationType(destName) == ActiveMQDestination.QUEUE_TYPE) {
|
||||
return getSession().createQueue(simpleName);
|
||||
} else {
|
||||
return getSession().createTopic(simpleName);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ import javax.jms.MessageConsumer;
|
|||
import javax.jms.MessageListener;
|
||||
import javax.jms.Topic;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.tool.properties.JmsClientProperties;
|
||||
import org.apache.activemq.tool.properties.JmsConsumerProperties;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -208,12 +209,19 @@ public class JmsConsumerClient extends AbstractJmsMeasurableClient {
|
|||
}
|
||||
|
||||
public MessageConsumer createJmsConsumer() throws JMSException {
|
||||
Destination[] dest = createDestination(destIndex, destCount);
|
||||
Destination[] dest = createDestinations(destCount);
|
||||
|
||||
if (this.client.getMessageSelector() == null)
|
||||
return createJmsConsumer(dest[0]);
|
||||
else
|
||||
return createJmsConsumer(dest[0], this.client.getMessageSelector(), false);
|
||||
Destination consumedDestination = dest[0];
|
||||
if (dest.length > 1) {
|
||||
String destinationName = ((ActiveMQDestination) consumedDestination).getPhysicalName();
|
||||
LOG.warn("Multiple destinations requested for consumer; using only first: {}", destinationName);
|
||||
}
|
||||
|
||||
if (this.client.getMessageSelector() == null) {
|
||||
return createJmsConsumer(consumedDestination);
|
||||
} else {
|
||||
return createJmsConsumer(consumedDestination, this.client.getMessageSelector(), false);
|
||||
}
|
||||
}
|
||||
|
||||
public MessageConsumer createJmsConsumer(Destination dest) throws JMSException {
|
||||
|
|
|
@ -77,7 +77,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
|||
public void sendCountBasedMessages(long messageCount) throws JMSException {
|
||||
// Parse through different ways to send messages
|
||||
// Avoided putting the condition inside the loop to prevent effect on performance
|
||||
Destination[] dest = createDestination(destIndex, destCount);
|
||||
Destination[] dest = createDestinations(destCount);
|
||||
|
||||
// Create a producer, if none is created.
|
||||
if (getJmsProducer() == null) {
|
||||
|
@ -165,7 +165,7 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
|||
// Parse through different ways to send messages
|
||||
// Avoided putting the condition inside the loop to prevent effect on performance
|
||||
|
||||
Destination[] dest = createDestination(destIndex, destCount);
|
||||
Destination[] dest = createDestinations(destCount);
|
||||
|
||||
// Create a producer, if none is created.
|
||||
if (getJmsProducer() == null) {
|
||||
|
|
|
@ -0,0 +1,175 @@
|
|||
package org.apache.activemq.tool;
|
||||
|
||||
import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
|
||||
import static org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.tool.properties.JmsClientProperties;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AbstractJmsClientTest {
|
||||
|
||||
public class NullJmsClient extends AbstractJmsClient {
|
||||
private JmsClientProperties client;
|
||||
|
||||
public NullJmsClient(ConnectionFactory factory) {
|
||||
super(factory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public JmsClientProperties getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setClient(JmsClientProperties client) {
|
||||
this.client = client;
|
||||
}
|
||||
}
|
||||
|
||||
private final String DEFAULT_DEST = "TEST.FOO";
|
||||
private static BrokerService brokerService;
|
||||
private static ActiveMQConnectionFactory connectionFactory;
|
||||
|
||||
private AbstractJmsClient jmsClient;
|
||||
private JmsClientProperties clientProperties;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBrokerAndConnectionFactory() throws Exception {
|
||||
brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
|
||||
brokerService.start();
|
||||
connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownBroker() throws Exception {
|
||||
brokerService.stop();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
jmsClient = new NullJmsClient(connectionFactory);
|
||||
clientProperties = new JmsClientProperties();
|
||||
clientProperties.setDestName(DEFAULT_DEST);
|
||||
jmsClient.setClient(clientProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestination() throws JMSException {
|
||||
assertDestinationNameType("dest", TOPIC_TYPE,
|
||||
asAmqDest(jmsClient.createDestination("dest")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestination_topic() throws JMSException {
|
||||
assertDestinationNameType("dest", TOPIC_TYPE,
|
||||
asAmqDest(jmsClient.createDestination("topic://dest")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestination_queue() throws JMSException {
|
||||
assertDestinationNameType("dest", QUEUE_TYPE,
|
||||
asAmqDest(jmsClient.createDestination("queue://dest")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations_commaSeparated() throws JMSException {
|
||||
clientProperties.setDestName("queue://foo,topic://cheese");
|
||||
Destination[] destinations = jmsClient.createDestinations(1);
|
||||
assertEquals(2, destinations.length);
|
||||
assertDestinationNameType("foo", QUEUE_TYPE, asAmqDest(destinations[0]));
|
||||
assertDestinationNameType("cheese", TOPIC_TYPE, asAmqDest(destinations[1]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations_multipleComposite() throws JMSException {
|
||||
clientProperties.setDestComposite(true);
|
||||
clientProperties.setDestName("queue://foo,queue://cheese");
|
||||
Destination[] destinations = jmsClient.createDestinations(1);
|
||||
assertEquals(1, destinations.length);
|
||||
// suffixes should be added
|
||||
assertDestinationNameType("foo,cheese", QUEUE_TYPE, asAmqDest(destinations[0]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations() throws JMSException {
|
||||
Destination[] destinations = jmsClient.createDestinations(1);
|
||||
assertEquals(1, destinations.length);
|
||||
assertDestinationNameType(DEFAULT_DEST, TOPIC_TYPE, asAmqDest(destinations[0]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations_multiple() throws JMSException {
|
||||
Destination[] destinations = jmsClient.createDestinations(2);
|
||||
assertEquals(2, destinations.length);
|
||||
// suffixes should be added
|
||||
assertDestinationNameType(DEFAULT_DEST + ".0", TOPIC_TYPE, asAmqDest(destinations[0]));
|
||||
assertDestinationNameType(DEFAULT_DEST + ".1", TOPIC_TYPE, asAmqDest(destinations[1]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations_multipleCommaSeparated() throws JMSException {
|
||||
clientProperties.setDestName("queue://foo,topic://cheese");
|
||||
Destination[] destinations = jmsClient.createDestinations(2);
|
||||
assertEquals(4, destinations.length);
|
||||
// suffixes should be added
|
||||
assertDestinationNameType("foo.0", QUEUE_TYPE, asAmqDest(destinations[0]));
|
||||
assertDestinationNameType("foo.1", QUEUE_TYPE, asAmqDest(destinations[1]));
|
||||
assertDestinationNameType("cheese.0", TOPIC_TYPE, asAmqDest(destinations[2]));
|
||||
assertDestinationNameType("cheese.1", TOPIC_TYPE, asAmqDest(destinations[3]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations_composite() throws JMSException {
|
||||
clientProperties.setDestComposite(true);
|
||||
Destination[] destinations = jmsClient.createDestinations(2);
|
||||
assertEquals(1, destinations.length);
|
||||
// suffixes should be added
|
||||
String expectedDestName = DEFAULT_DEST + ".0," + DEFAULT_DEST + ".1";
|
||||
assertDestinationNameType(expectedDestName, TOPIC_TYPE, asAmqDest(destinations[0]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations_compositeQueue() throws JMSException {
|
||||
clientProperties.setDestComposite(true);
|
||||
clientProperties.setDestName("queue://" + DEFAULT_DEST);
|
||||
Destination[] destinations = jmsClient.createDestinations(2);
|
||||
assertEquals(1, destinations.length);
|
||||
// suffixes should be added
|
||||
String expectedDestName = DEFAULT_DEST + ".0," + DEFAULT_DEST + ".1";
|
||||
assertDestinationNameType(expectedDestName, QUEUE_TYPE, asAmqDest(destinations[0]));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations_compositeCommaSeparated() throws JMSException {
|
||||
clientProperties.setDestComposite(true);
|
||||
clientProperties.setDestName("queue://foo,topic://cheese");
|
||||
Destination[] destinations = jmsClient.createDestinations(2);
|
||||
assertEquals(2, destinations.length);
|
||||
|
||||
assertDestinationNameType("foo.0,foo.1", QUEUE_TYPE, asAmqDest(destinations[0]));
|
||||
assertDestinationNameType("cheese.0,cheese.1", TOPIC_TYPE, asAmqDest(destinations[1]));
|
||||
}
|
||||
|
||||
private void assertDestinationNameType(String physicalName, byte destinationType, ActiveMQDestination destination) {
|
||||
assertEquals(destinationType, destination.getDestinationType());
|
||||
assertEquals(physicalName, destination.getPhysicalName());
|
||||
}
|
||||
|
||||
private ActiveMQDestination asAmqDest(Destination destination) {
|
||||
return (ActiveMQDestination) destination;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue