mirror of https://github.com/apache/activemq.git
Add support for temp topics and queues to perf tests. This closes #49
This commit is contained in:
parent
51566104ab
commit
46bc26cea5
|
@ -19,11 +19,7 @@ package org.apache.activemq.tool;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.*;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.tool.properties.JmsClientProperties;
|
||||
|
@ -36,6 +32,8 @@ public abstract class AbstractJmsClient {
|
|||
|
||||
private static final String QUEUE_SCHEME = "queue://";
|
||||
private static final String TOPIC_SCHEME = "topic://";
|
||||
private static final String TEMP_QUEUE_SCHEME = "temp-queue://";
|
||||
private static final String TEMP_TOPIC_SCHEME = "temp-topic://";
|
||||
public static final String DESTINATION_SEPARATOR = ",";
|
||||
|
||||
protected ConnectionFactory factory;
|
||||
|
@ -185,8 +183,18 @@ public abstract class AbstractJmsClient {
|
|||
}
|
||||
|
||||
LOG.info("Creating composite destination: {}", compDestName);
|
||||
return (destinationType == ActiveMQDestination.TOPIC_TYPE) ?
|
||||
getSession().createTopic(compDestName) : getSession().createQueue(compDestName);
|
||||
Destination destination;
|
||||
Session session = getSession();
|
||||
if (destinationType == ActiveMQDestination.TOPIC_TYPE) {
|
||||
destination = session.createTopic(compDestName);
|
||||
} else if (destinationType == ActiveMQDestination.QUEUE_TYPE) {
|
||||
destination = session.createQueue(compDestName);
|
||||
} else {
|
||||
throw new UnsupportedOperationException(
|
||||
"Cannot create composite destinations using temporary queues or topics.");
|
||||
}
|
||||
assert (destination != null);
|
||||
return destination;
|
||||
}
|
||||
|
||||
private String[] mapToSimpleNames(String[] destNames) {
|
||||
|
@ -198,22 +206,30 @@ public abstract class AbstractJmsClient {
|
|||
return simpleNames;
|
||||
}
|
||||
|
||||
private String getSimpleName(String destName) {
|
||||
protected String getSimpleName(String destName) {
|
||||
String simpleName;
|
||||
if (destName.startsWith(QUEUE_SCHEME)) {
|
||||
simpleName = destName.substring(QUEUE_SCHEME.length());
|
||||
} else if (destName.startsWith(TOPIC_SCHEME)) {
|
||||
simpleName = destName.substring(TOPIC_SCHEME.length());
|
||||
} else if (destName.startsWith(TEMP_QUEUE_SCHEME)) {
|
||||
simpleName = destName.substring(TEMP_QUEUE_SCHEME.length());
|
||||
} else if (destName.startsWith(TEMP_TOPIC_SCHEME)) {
|
||||
simpleName = destName.substring(TEMP_TOPIC_SCHEME.length());
|
||||
} else {
|
||||
simpleName = destName;
|
||||
}
|
||||
return simpleName;
|
||||
}
|
||||
|
||||
private byte getDestinationType(String destName) {
|
||||
protected byte getDestinationType(String destName) {
|
||||
assert (destName != null);
|
||||
if (destName.startsWith(QUEUE_SCHEME)) {
|
||||
return ActiveMQDestination.QUEUE_TYPE;
|
||||
} else if (destName.startsWith(TEMP_QUEUE_SCHEME)) {
|
||||
return ActiveMQDestination.TEMP_QUEUE_TYPE;
|
||||
} else if (destName.startsWith(TEMP_TOPIC_SCHEME)) {
|
||||
return ActiveMQDestination.TEMP_TOPIC_TYPE;
|
||||
} else {
|
||||
return ActiveMQDestination.TOPIC_TYPE;
|
||||
}
|
||||
|
@ -221,10 +237,34 @@ public abstract class AbstractJmsClient {
|
|||
|
||||
protected Destination createDestination(String destName) throws JMSException {
|
||||
String simpleName = getSimpleName(destName);
|
||||
if (getDestinationType(destName) == ActiveMQDestination.QUEUE_TYPE) {
|
||||
byte destinationType = getDestinationType(destName);
|
||||
|
||||
if (destinationType == ActiveMQDestination.QUEUE_TYPE) {
|
||||
LOG.info("Creating queue: {}", destName);
|
||||
return getSession().createQueue(simpleName);
|
||||
} else {
|
||||
} else if (destinationType == ActiveMQDestination.TOPIC_TYPE) {
|
||||
LOG.info("Creating topic: {}", destName);
|
||||
return getSession().createTopic(simpleName);
|
||||
} else {
|
||||
return createTemporaryDestination(destName);
|
||||
}
|
||||
}
|
||||
|
||||
protected Destination createTemporaryDestination(String destName) throws JMSException {
|
||||
byte destinationType = getDestinationType(destName);
|
||||
|
||||
if (destinationType == ActiveMQDestination.TEMP_QUEUE_TYPE) {
|
||||
LOG.warn("Creating temporary queue. Requested name ({}) ignored.", destName);
|
||||
TemporaryQueue temporaryQueue = getSession().createTemporaryQueue();
|
||||
LOG.info("Temporary queue created: {}", temporaryQueue.getQueueName());
|
||||
return temporaryQueue;
|
||||
} else if (destinationType == ActiveMQDestination.TEMP_TOPIC_TYPE) {
|
||||
LOG.warn("Creating temporary topic. Requested name ({}) ignored.", destName);
|
||||
TemporaryTopic temporaryTopic = getSession().createTemporaryTopic();
|
||||
LOG.info("Temporary topic created: {}", temporaryTopic.getTopicName());
|
||||
return temporaryTopic;
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unrecognized destination type: " + destinationType);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -237,7 +277,6 @@ public abstract class AbstractJmsClient {
|
|||
* @throws JMSException in case the call to JMS Session.commit() fails.
|
||||
*/
|
||||
public boolean commitTxIfNecessary() throws JMSException {
|
||||
|
||||
internalTxCounter++;
|
||||
if (getClient().isSessTransacted()) {
|
||||
if ((internalTxCounter % getClient().getCommitAfterXMsgs()) == 0) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import javax.jms.ConnectionFactory;
|
|||
import org.apache.activemq.tool.sampler.MeasurableClient;
|
||||
|
||||
public abstract class AbstractJmsMeasurableClient extends AbstractJmsClient implements MeasurableClient {
|
||||
|
||||
protected AtomicLong throughput = new AtomicLong(0);
|
||||
|
||||
public AbstractJmsMeasurableClient(ConnectionFactory factory) {
|
||||
|
|
|
@ -31,6 +31,7 @@ import javax.jms.JMSException;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.tool.properties.JmsClientProperties;
|
||||
import org.apache.activemq.tool.properties.JmsProducerProperties;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -322,6 +323,24 @@ public class JmsProducerClient extends AbstractJmsMeasurableClient {
|
|||
client = (JmsProducerProperties)clientProps;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Destination createTemporaryDestination(String destName) throws JMSException {
|
||||
String simpleName = getSimpleName(destName);
|
||||
byte destinationType = getDestinationType(destName);
|
||||
|
||||
// when we produce to temp destinations, we publish to them as
|
||||
// though they were normal queues or topics
|
||||
if (destinationType == ActiveMQDestination.TEMP_QUEUE_TYPE) {
|
||||
LOG.info("Creating queue: {}", destName);
|
||||
return getSession().createQueue(simpleName);
|
||||
} else if (destinationType == ActiveMQDestination.TEMP_TOPIC_TYPE) {
|
||||
LOG.info("Creating topic: {}", destName);
|
||||
return getSession().createTopic(simpleName);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unrecognized destination type: " + destinationType);
|
||||
}
|
||||
}
|
||||
|
||||
protected String buildText(String text, int size) {
|
||||
byte[] data = new byte[size - text.length()];
|
||||
Arrays.fill(data, (byte) 0);
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package org.apache.activemq.tool;
|
||||
|
||||
import static org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE;
|
||||
import static org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE;
|
||||
import static org.apache.activemq.command.ActiveMQDestination.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.net.URI;
|
||||
|
@ -85,6 +84,18 @@ public class AbstractJmsClientTest {
|
|||
asAmqDest(jmsClient.createDestination("queue://dest")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestination_tempQueue() throws JMSException {
|
||||
assertDestinationType(TEMP_QUEUE_TYPE,
|
||||
asAmqDest(jmsClient.createDestination("temp-queue://dest")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestination_tempTopic() throws JMSException {
|
||||
assertDestinationType(TEMP_TOPIC_TYPE,
|
||||
asAmqDest(jmsClient.createDestination("temp-topic://dest")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestinations_commaSeparated() throws JMSException {
|
||||
clientProperties.setDestName("queue://foo,topic://cheese");
|
||||
|
@ -169,6 +180,10 @@ public class AbstractJmsClientTest {
|
|||
assertEquals(physicalName, destination.getPhysicalName());
|
||||
}
|
||||
|
||||
private void assertDestinationType(byte destinationType, ActiveMQDestination destination) {
|
||||
assertEquals(destinationType, destination.getDestinationType());
|
||||
}
|
||||
|
||||
private ActiveMQDestination asAmqDest(Destination destination) {
|
||||
return (ActiveMQDestination) destination;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
package org.apache.activemq.tool;
|
||||
|
||||
import static org.apache.activemq.command.ActiveMQDestination.*;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.tool.properties.JmsProducerProperties;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import java.net.URI;
|
||||
|
||||
public class JmsProducerClientTest {
|
||||
|
||||
private final String DEFAULT_DEST = "TEST.FOO";
|
||||
private static BrokerService brokerService;
|
||||
private static ActiveMQConnectionFactory connectionFactory;
|
||||
|
||||
private AbstractJmsClient jmsClient;
|
||||
private JmsProducerProperties producerProperties;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBrokerAndConnectionFactory() throws Exception {
|
||||
brokerService = BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
|
||||
brokerService.start();
|
||||
connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownBroker() throws Exception {
|
||||
brokerService.stop();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
jmsClient = new JmsProducerClient(connectionFactory);
|
||||
producerProperties = new JmsProducerProperties();
|
||||
producerProperties.setDestName(DEFAULT_DEST);
|
||||
jmsClient.setClient(producerProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestination_tempQueue() throws JMSException {
|
||||
assertDestinationNameType("dest", QUEUE_TYPE,
|
||||
asAmqDest(jmsClient.createDestination("temp-queue://dest")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDestination_tempTopic() throws JMSException {
|
||||
assertDestinationNameType("dest", TOPIC_TYPE,
|
||||
asAmqDest(jmsClient.createDestination("temp-topic://dest")));
|
||||
}
|
||||
|
||||
private void assertDestinationNameType(String physicalName, byte destinationType, ActiveMQDestination destination) {
|
||||
assertEquals(destinationType, destination.getDestinationType());
|
||||
assertEquals(physicalName, destination.getPhysicalName());
|
||||
}
|
||||
|
||||
private ActiveMQDestination asAmqDest(Destination destination) {
|
||||
return (ActiveMQDestination) destination;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue