This commit is contained in:
Timothy Bish 2013-10-18 11:46:56 -04:00
parent 1896d27409
commit d4da85f39b
13 changed files with 136 additions and 137 deletions

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.camel;
import java.util.Timer;
import java.util.TimerTask;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
@ -34,21 +31,11 @@ import org.slf4j.LoggerFactory;
public class AMQ2611Test extends TestCase {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "test.queue";
private static final Logger log = LoggerFactory.getLogger(AMQ2611Test.class);
private BrokerService brokerService = null;
private Timer statisticsTimer = null;
private CamelContext camelContext = null;
public AMQ2611Test() {
}
private void createBroker() throws Exception {
brokerService = new BrokerService();
brokerService.addConnector(BROKER_URL);
@ -64,8 +51,7 @@ public class AMQ2611Test extends TestCase {
private void createCamelContext() throws Exception {
log.info("creating context and sending message");
camelContext = new DefaultCamelContext();
camelContext.addComponent("activemq", ActiveMQComponent
.activeMQComponent(BROKER_URL));
camelContext.addComponent("activemq", ActiveMQComponent.activeMQComponent(BROKER_URL));
final String queueEndpointName = "activemq:queue" + QUEUE_NAME;
camelContext.addRoutes(new RouteBuilder() {
@Override
@ -74,8 +60,7 @@ public class AMQ2611Test extends TestCase {
}
});
camelContext.start();
final ProducerTemplate producerTemplate = camelContext
.createProducerTemplate();
final ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
producerTemplate.sendBody(queueEndpointName, "message");
}
@ -100,5 +85,4 @@ public class AMQ2611Test extends TestCase {
log.warn("run", e);
}
}
}

View File

@ -20,15 +20,18 @@ package org.apache.activemq.camel;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit38.AbstractJUnit38SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
*
*/
@ContextConfiguration
public class CamelDestinationExclusiveConsumerTest extends AbstractJUnit38SpringContextTests {
@RunWith(SpringJUnit4ClassRunner.class)
public class CamelDestinationExclusiveConsumerTest {
@Autowired
protected CamelContext camelContext;
@ -36,6 +39,7 @@ public class CamelDestinationExclusiveConsumerTest extends AbstractJUnit38Spring
@EndpointInject(uri = "mock:results")
protected MockEndpoint expectedEndpoint;
@Test
public void testMocksAreValid() throws Exception {
expectedEndpoint.expectedMessageCount(1);
MockEndpoint.assertIsSatisfied(camelContext);

View File

@ -23,22 +23,21 @@ import javax.naming.Context;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.jndi.JndiContext;
/**
* A helper class for test cases which use an embedded broker and use Camel to do the routing
* A helper class for test cases which use an embedded broker and use Camel to
* do the routing
*
*
*/
public abstract class CamelEmbeddedBrokerTestSupport extends EmbeddedBrokerTestSupport {
protected CamelContext camelContext;
protected ProducerTemplate template;
@Override
protected void setUp() throws Exception {
bindAddress = "tcp://localhost:61616";
@ -63,11 +62,11 @@ public abstract class CamelEmbeddedBrokerTestSupport extends EmbeddedBrokerTestS
}
}
protected CamelContext createCamelContext() throws Exception {
return new DefaultCamelContext(createJndiContext());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
protected Context createJndiContext() throws Exception {
return new JndiContext(new Hashtable());
}
@ -75,12 +74,12 @@ public abstract class CamelEmbeddedBrokerTestSupport extends EmbeddedBrokerTestS
protected void addCamelRoutes(CamelContext camelContext) throws Exception {
}
/**
* Resolves a mandatory endpoint for the given URI or an exception is thrown
*
* @param uri the Camel <a href="">URI</a> to use to create or resolve an endpoint
* @param uri
* the Camel <a href="">URI</a> to use to create or resolve an
* endpoint
* @return the endpoint
*/
protected Endpoint resolveMandatoryEndpoint(String uri) {
@ -88,16 +87,18 @@ public abstract class CamelEmbeddedBrokerTestSupport extends EmbeddedBrokerTestS
}
/**
* Resolves a mandatory endpoint for the given URI and expected type or an exception is thrown
* Resolves a mandatory endpoint for the given URI and expected type or an
* exception is thrown
*
* @param uri the Camel <a href="">URI</a> to use to create or resolve an endpoint
* @param uri
* the Camel <a href="">URI</a> to use to create or resolve an
* endpoint
* @return the endpoint
*/
protected <T extends Endpoint> T resolveMandatoryEndpoint(String uri, Class<T> endpointType) {
return resolveMandatoryEndpoint(camelContext, uri, endpointType);
}
/**
* Resolves an endpoint and asserts that it is found
*/
@ -112,8 +113,7 @@ public abstract class CamelEmbeddedBrokerTestSupport extends EmbeddedBrokerTestS
/**
* Resolves an endpoint and asserts that it is found
*/
protected <T extends Endpoint> T resolveMandatoryEndpoint(CamelContext context, String uri,
Class<T> endpointType) {
protected <T extends Endpoint> T resolveMandatoryEndpoint(CamelContext context, String uri, Class<T> endpointType) {
T endpoint = context.getEndpoint(uri, endpointType);
assertNotNull("No endpoint found for URI: " + uri, endpoint);
@ -122,16 +122,18 @@ public abstract class CamelEmbeddedBrokerTestSupport extends EmbeddedBrokerTestS
}
/**
* Resolves the mandatory Mock endpoint using a URI of the form <code>mock:someName</code>
* Resolves the mandatory Mock endpoint using a URI of the form
* <code>mock:someName</code>
*
* @param uri the URI which typically starts with "mock:" and has some name
* @return the mandatory mock endpoint or an exception is thrown if it could not be resolved
* @param uri
* the URI which typically starts with "mock:" and has some name
* @return the mandatory mock endpoint or an exception is thrown if it could
* not be resolved
*/
protected MockEndpoint getMockEndpoint(String uri) {
return resolveMandatoryEndpoint(uri, MockEndpoint.class);
}
/**
* Asserts that all the expectations of the Mock endpoints are valid
*/

View File

@ -28,7 +28,7 @@ import javax.jms.TextMessage;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.test.junit4.CamelSpringTestSupport;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -31,22 +31,32 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.camel.CamelContext;
import org.apache.camel.Handler;
import org.apache.camel.RecipientList;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit38.AbstractJUnit38SpringContextTests;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import static org.junit.Assert.*;
/**
*
*/
@ContextConfiguration
public class CamelRedeliveryTest extends AbstractJUnit38SpringContextTests {
@RunWith(SpringJUnit4ClassRunner.class)
public class CamelRedeliveryTest {
private static final transient Logger LOG = LoggerFactory.getLogger(CamelRedeliveryTest.class);
@Autowired
protected CamelContext camelContext;
@Autowired
protected ApplicationContext applicationContext;
@Test
public void testRedeliveryViaCamel() throws Exception {

View File

@ -34,9 +34,9 @@ import org.apache.activemq.util.ThreadTracker;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Assert;
// see: https://issues.apache.org/activemq/browse/AMQ-2966
public class CamelVMTransportRoutingTest extends TestCase {
@ -55,39 +55,40 @@ public class CamelVMTransportRoutingTest extends TestCase {
private final String SENDER_TOPIC = "A";
private final String RECEIVER_TOPIC = "B";
@SuppressWarnings("unused")
public void testSendReceiveWithCamelRouteIntercepting() throws Exception {
final int MSG_COUNT = 1000;
final int MSG_COUNT = 1000;
Session sendSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session receiverSession1 = receiverConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session receiverSession2 = receiverConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sendSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session receiverSession1 = receiverConnection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session receiverSession2 = receiverConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination sendTo = sendSession.createTopic(SENDER_TOPIC);
Destination receiveFrom = receiverSession1.createTopic(RECEIVER_TOPIC);
Destination sendTo = sendSession.createTopic(SENDER_TOPIC);
Destination receiveFrom = receiverSession1.createTopic(RECEIVER_TOPIC);
TextMessage message = sendSession.createTextMessage(MSG_STRING);
TextMessage message = sendSession.createTextMessage(MSG_STRING);
MessageConsumer receiver1 = receiverSession1.createConsumer(receiveFrom);
MessageConsumer receiver2 = receiverSession2.createConsumer(receiveFrom);
MessageConsumer receiver1 = receiverSession1.createConsumer(receiveFrom);
MessageConsumer receiver2 = receiverSession2.createConsumer(receiveFrom);
MessageProducer sender = sendSession.createProducer(sendTo);
for( int i = 0; i < MSG_COUNT; ++i ) {
sender.send(message);
}
MessageProducer sender = sendSession.createProducer(sendTo);
for( int i = 0; i < MSG_COUNT; ++i ) {
sender.send(message);
}
for( int i = 0; i < MSG_COUNT; ++i ) {
for( int i = 0; i < MSG_COUNT; ++i ) {
log.debug("Attempting Received for Message #" + i);
TextMessage received1 = (TextMessage) receiver1.receive(5000);
Assert.assertNotNull(received1);
Assert.assertEquals(MSG_STRING, received1.getText());
log.debug("Attempting Received for Message #" + i);
TextMessage received1 = (TextMessage) receiver1.receive(5000);
Assert.assertNotNull(received1);
Assert.assertEquals(MSG_STRING, received1.getText());
}
}
protected BrokerService createBroker() throws Exception {
BrokerService service = new BrokerService();
BrokerService service = new BrokerService();
service.setPersistent(false);
service.setUseJmx(false);
connector = service.addConnector("tcp://localhost:0");
@ -95,15 +96,16 @@ public class CamelVMTransportRoutingTest extends TestCase {
return service;
}
@Override
public void setUp() throws Exception {
broker = createBroker();
broker.start();
broker.waitUntilStarted();
broker = createBroker();
broker.start();
broker.waitUntilStarted();
Thread.sleep(1000);
Thread.sleep(1000);
createCamelContext();
createCamelContext();
ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri());
senderConnection = connFactory.createConnection();
@ -114,22 +116,23 @@ public class CamelVMTransportRoutingTest extends TestCase {
receiverConnection2.start();
}
@Override
public void tearDown() throws Exception {
if( senderConnection != null ) {
senderConnection.close();
}
if( senderConnection != null ) {
senderConnection.close();
}
if( receiverConnection1 != null ) {
receiverConnection1.close();
}
if( receiverConnection1 != null ) {
receiverConnection1.close();
}
if( receiverConnection2 != null ) {
receiverConnection2.close();
}
if( receiverConnection2 != null ) {
receiverConnection2.close();
}
camelContext.stop();
broker.stop();
camelContext.stop();
broker.stop();
ThreadTracker.result();
}
@ -139,10 +142,10 @@ public class CamelVMTransportRoutingTest extends TestCase {
final String fromEndpoint = "activemq:topic:" + SENDER_TOPIC;
final String toEndpoint = "activemq:topic:" + RECEIVER_TOPIC;
log.info("creating context and sending message");
log.info("creating context and sending message");
camelContext = new DefaultCamelContext();
camelContext.addComponent("activemq",
ActiveMQComponent.activeMQComponent("vm://localhost?create=false&waitForStart=10000"));
ActiveMQComponent.activeMQComponent("vm://localhost?create=false&waitForStart=10000"));
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {

View File

@ -25,7 +25,7 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.apache.camel.test.junit4.CamelSpringTestSupport;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -16,40 +16,34 @@
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.apache.camel.test.junit4.CamelSpringTestSupport;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.context.support.AbstractXmlApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@Ignore("Test hangs")
public class JmsJdbcXALoadTest extends CamelSpringTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JmsJdbcXATest.class);
BrokerService broker = null;
int messageCount;
public java.sql.Connection initDb() throws Exception {
String createStatement =
"CREATE TABLE SCP_INPUT_MESSAGES (" +
"id int NOT NULL GENERATED ALWAYS AS IDENTITY, " +
"messageId varchar(96) NOT NULL, " +
"messageCorrelationId varchar(96) NOT NULL, " +
"messageContent varchar(2048) NOT NULL, " +
"PRIMARY KEY (id) )";
String createStatement = "CREATE TABLE SCP_INPUT_MESSAGES (" + "id int NOT NULL GENERATED ALWAYS AS IDENTITY, " + "messageId varchar(96) NOT NULL, "
+ "messageCorrelationId varchar(96) NOT NULL, " + "messageContent varchar(2048) NOT NULL, " + "PRIMARY KEY (id) )";
java.sql.Connection conn = getJDBCConnection();
try {
@ -82,6 +76,7 @@ public class JmsJdbcXALoadTest extends CamelSpringTestSupport {
return count;
}
@SuppressWarnings("unused")
@Test
public void testRecoveryCommit() throws Exception {
java.sql.Connection jdbcConn = initDb();
@ -89,14 +84,13 @@ public class JmsJdbcXALoadTest extends CamelSpringTestSupport {
sendJMSMessageToKickOffRoute(count);
final java.sql.Connection freshConnection = getJDBCConnection();
assertTrue("did not get replay", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return count == dumpDb(freshConnection);
}
}, 20*60*1000));
}, 20 * 60 * 1000));
assertEquals("still one message in db", count, dumpDb(freshConnection));
}
@ -142,8 +136,6 @@ public class JmsJdbcXALoadTest extends CamelSpringTestSupport {
return new ClassPathXmlApplicationContext("org/apache/activemq/camel/jmsXajdbc.xml");
}
@Override
public void tearDown() throws Exception {
super.tearDown();

View File

@ -19,10 +19,12 @@ package org.apache.activemq.camel;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
@ -31,7 +33,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.util.Wait;
import org.apache.camel.test.junit4.CamelSpringTestSupport;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.apache.commons.dbcp.BasicDataSource;
import org.junit.Ignore;
import org.junit.Test;
@ -164,6 +166,7 @@ public class JmsJdbcXATest extends CamelSpringTestSupport {
return brokerService;
}
@SuppressWarnings("unchecked")
@Override
protected AbstractXmlApplicationContext createApplicationContext() {
@ -184,6 +187,7 @@ public class JmsJdbcXATest extends CamelSpringTestSupport {
// so commit will hang as if reply is lost
context.setDontSendReponse(true);
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
LOG.info("Stopping broker post commit...");
try {

View File

@ -24,17 +24,21 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.util.ObjectHelper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit38.AbstractJUnit38SpringContextTests;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
*
*/
@ContextConfiguration
public class SetHeaderTest extends AbstractJUnit38SpringContextTests {
@RunWith(SpringJUnit4ClassRunner.class)
public class SetHeaderTest {
private static final transient Logger LOG = LoggerFactory.getLogger(SetHeaderTest.class);
@Autowired
@ -43,6 +47,7 @@ public class SetHeaderTest extends AbstractJUnit38SpringContextTests {
@EndpointInject(uri = "mock:results")
protected MockEndpoint expectedEndpoint;
@Test
public void testMocksAreValid() throws Exception {
// lets add more expectations
expectedEndpoint.expectedMessageCount(1);

View File

@ -32,7 +32,7 @@ import org.apache.activemq.util.Wait;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.test.junit4.CamelSpringTestSupport;
import org.apache.camel.test.spring.CamelSpringTestSupport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View File

@ -23,7 +23,7 @@ import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.camel.component.jms.JmsConsumer;
import org.apache.camel.component.jms.JmsEndpoint;
import org.apache.camel.component.jms.JmsProducer;
import org.apache.camel.processor.CamelLogger;
import org.apache.camel.processor.CamelLogProcessor;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import org.springframework.jms.core.JmsTemplate;
@ -83,7 +83,7 @@ public class ActiveMQConfigureTest extends CamelTestSupport {
@Test
public void testListenerContainerUsesSpringConnectionFactory() throws Exception {
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:topic:test.foo");
JmsConsumer consumer = endpoint.createConsumer(new CamelLogger());
JmsConsumer consumer = endpoint.createConsumer(new CamelLogProcessor());
AbstractMessageListenerContainer listenerContainer = consumer.getListenerContainer();
assertEquals("pubSubDomain", true, listenerContainer.isPubSubDomain());

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.camel.component.broker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -25,6 +28,7 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
@ -33,18 +37,13 @@ import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class BrokerComponentXMLConfigTest {
protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/";
private static final Logger LOG = LoggerFactory.getLogger(BrokerComponentXMLConfigTest.class);
protected static final String TOPIC_NAME = "test.broker.component.topic";
protected static final String QUEUE_NAME = "test.broker.component.queue";
protected static final String ROUTE_QUEUE_NAME = "test.broker.component.route";
@ -65,15 +64,14 @@ public class BrokerComponentXMLConfigTest {
public void setUp() throws Exception {
brokerService = createBroker(new FileSystemResource(CONF_ROOT + "broker-camel.xml"));
factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
consumerConnection = factory.createConnection();
consumerConnection.start();
producerConnection = factory.createConnection();
producerConnection.start();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE);
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected BrokerService createBroker(String resource) throws Exception {
@ -96,10 +94,10 @@ public class BrokerComponentXMLConfigTest {
@After
public void tearDown() throws Exception {
if (producerConnection != null){
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null){
if (consumerConnection != null) {
consumerConnection.close();
}
if (brokerService != null) {
@ -114,13 +112,13 @@ public class BrokerComponentXMLConfigTest {
Topic topic = consumerSession.createTopic(TOPIC_NAME);
final CountDownLatch latch = new CountDownLatch(messageCount);
MessageConsumer consumer = consumerSession.createConsumer(queue);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
try {
assertEquals(9,message.getJMSPriority());
latch.countDown();
assertEquals(9, message.getJMSPriority());
latch.countDown();
} catch (Throwable e) {
e.printStackTrace();
}
@ -128,13 +126,13 @@ public class BrokerComponentXMLConfigTest {
});
MessageProducer producer = producerSession.createProducer(topic);
for (int i = 0; i < messageCount; i++){
for (int i = 0; i < messageCount; i++) {
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0,latch.getCount());
assertEquals(0, latch.getCount());
}
@ -142,9 +140,8 @@ public class BrokerComponentXMLConfigTest {
public void testRouteWithDestinationLimit() throws Exception {
final ActiveMQQueue routeQueue = new ActiveMQQueue(ROUTE_QUEUE_NAME);
final CountDownLatch routeLatch = new CountDownLatch(DIVERT_COUNT);
MessageConsumer messageConsumer = consumerSession.createConsumer(routeQueue);
MessageConsumer messageConsumer = consumerSession.createConsumer(routeQueue);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
@ -156,8 +153,8 @@ public class BrokerComponentXMLConfigTest {
}
});
final CountDownLatch divertLatch = new CountDownLatch(messageCount-DIVERT_COUNT);
MessageConsumer divertConsumer = consumerSession.createConsumer(new ActiveMQQueue(DIVERTED_QUEUE_NAME));
final CountDownLatch divertLatch = new CountDownLatch(messageCount - DIVERT_COUNT);
MessageConsumer divertConsumer = consumerSession.createConsumer(new ActiveMQQueue(DIVERTED_QUEUE_NAME));
divertConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
@ -169,19 +166,17 @@ public class BrokerComponentXMLConfigTest {
}
});
MessageProducer producer = producerSession.createProducer(routeQueue);
for (int i = 0; i < messageCount; i++){
for (int i = 0; i < messageCount; i++) {
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
routeLatch.await(timeOutInSeconds, TimeUnit.SECONDS);
divertLatch.await(timeOutInSeconds,TimeUnit.SECONDS);
assertEquals(0,routeLatch.getCount());
assertEquals(0,divertLatch.getCount());
divertLatch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0, routeLatch.getCount());
assertEquals(0, divertLatch.getCount());
}
}