mirror of https://github.com/apache/activemq.git
Clean up some tests, reduce resources used and reduce runtime, convert
to JUnit 4 when possible and add timeouts.
This commit is contained in:
parent
05b401993b
commit
02dc6ce982
|
@ -16,14 +16,15 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.springframework.jms.core.JmsTemplate;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Destination;
|
||||
|
||||
/**
|
||||
* A useful base class which creates and closes an embedded broker
|
||||
|
@ -40,6 +41,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
|
|||
protected ActiveMQDestination destination;
|
||||
protected JmsTemplate template;
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
if (broker == null) {
|
||||
broker = createBroker();
|
||||
|
@ -56,6 +58,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
|
|||
template.afterPropertiesSet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
if (broker != null) {
|
||||
try {
|
||||
|
@ -119,6 +122,7 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
|
|||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService answer = new BrokerService();
|
||||
answer.setPersistent(isPersistent());
|
||||
answer.getManagementContext().setCreateConnector(false);
|
||||
answer.addConnector(bindAddress);
|
||||
return answer;
|
||||
}
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -24,26 +27,35 @@ import javax.jms.MessageConsumer;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ExclusiveConsumerTest extends TestCase {
|
||||
public class ExclusiveConsumerTest {
|
||||
|
||||
private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
|
||||
private static final String VM_BROKER_URL = "vm://localhost";
|
||||
|
||||
public ExclusiveConsumerTest(String name) {
|
||||
super(name);
|
||||
private BrokerService brokerService;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
brokerService = new BrokerService();
|
||||
brokerService.setPersistent(false);
|
||||
brokerService.setUseJmx(false);
|
||||
brokerService.setSchedulerSupport(false);
|
||||
brokerService.setAdvisorySupport(false);
|
||||
|
||||
brokerService.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (brokerService != null) {
|
||||
brokerService.stop();
|
||||
brokerService = null;
|
||||
}
|
||||
}
|
||||
|
||||
private Connection createConnection(final boolean start) throws JMSException {
|
||||
|
@ -55,6 +67,7 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
return conn;
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
|
@ -63,7 +76,6 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
Session senderSession = null;
|
||||
|
||||
try {
|
||||
|
||||
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -87,15 +99,14 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
// Verify exclusive consumer receives the message.
|
||||
assertNotNull(exclusiveConsumer.receive(100));
|
||||
assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
} finally {
|
||||
fallbackSession.close();
|
||||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testExclusiveConsumerSelectedCreatedAfter() throws JMSException, InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
|
@ -104,7 +115,6 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
Session senderSession = null;
|
||||
|
||||
try {
|
||||
|
||||
exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
@ -132,11 +142,10 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException,
|
||||
InterruptedException {
|
||||
@Test(timeout = 60000)
|
||||
public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
Session exclusiveSession1 = null;
|
||||
|
@ -145,14 +154,12 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
Session senderSession = null;
|
||||
|
||||
try {
|
||||
|
||||
exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024
|
||||
// bug.
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024 bug.
|
||||
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true");
|
||||
MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
|
||||
MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
|
||||
|
@ -173,8 +180,7 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
assertNull(exclusiveConsumer2.receive(100));
|
||||
assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer
|
||||
// takes over
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer takes over
|
||||
exclusiveConsumer1.close();
|
||||
|
||||
producer.send(msg);
|
||||
|
@ -188,11 +194,10 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException,
|
||||
InterruptedException {
|
||||
@Test(timeout = 60000)
|
||||
public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException, InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
Session exclusiveSession1 = null;
|
||||
|
@ -201,14 +206,12 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
Session senderSession = null;
|
||||
|
||||
try {
|
||||
|
||||
exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024
|
||||
// bug.
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024 bug.
|
||||
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true");
|
||||
MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
|
||||
|
||||
|
@ -230,8 +233,7 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
assertNull(exclusiveConsumer2.receive(100));
|
||||
assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer
|
||||
// takes over
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer takes over
|
||||
exclusiveConsumer1.close();
|
||||
|
||||
producer.send(msg);
|
||||
|
@ -239,15 +241,14 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
|
||||
assertNotNull(exclusiveConsumer2.receive(1000));
|
||||
assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
} finally {
|
||||
fallbackSession.close();
|
||||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
|
@ -261,8 +262,7 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024
|
||||
// bug.
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024 bug.
|
||||
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true");
|
||||
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
|
||||
|
||||
|
@ -281,22 +281,20 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
assertNotNull(exclusiveConsumer.receive(100));
|
||||
assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer
|
||||
// takes over
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer takes over
|
||||
exclusiveConsumer.close();
|
||||
|
||||
producer.send(msg);
|
||||
|
||||
assertNotNull(fallbackConsumer.receive(100));
|
||||
|
||||
} finally {
|
||||
fallbackSession.close();
|
||||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testFallbackToExclusiveConsumer() throws JMSException, InterruptedException {
|
||||
Connection conn = createConnection(true);
|
||||
|
||||
|
@ -310,8 +308,7 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024
|
||||
// bug.
|
||||
// This creates the exclusive consumer first which avoids AMQ-1024 bug.
|
||||
ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true");
|
||||
MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
|
||||
|
||||
|
@ -330,8 +327,7 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
assertNotNull(exclusiveConsumer.receive(100));
|
||||
assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer
|
||||
// takes over
|
||||
// Close the exclusive consumer to verify the non-exclusive consumer takes over
|
||||
exclusiveConsumer.close();
|
||||
|
||||
producer.send(msg);
|
||||
|
@ -339,19 +335,16 @@ public class ExclusiveConsumerTest extends TestCase {
|
|||
// Verify other non-exclusive consumer receices the message.
|
||||
assertNotNull(fallbackConsumer.receive(100));
|
||||
|
||||
// Create exclusive consumer to determine if it will start receiving
|
||||
// the messages.
|
||||
// Create exclusive consumer to determine if it will start receiving the messages.
|
||||
exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
|
||||
|
||||
producer.send(msg);
|
||||
assertNotNull(exclusiveConsumer.receive(100));
|
||||
assertNull(fallbackConsumer.receive(100));
|
||||
|
||||
} finally {
|
||||
fallbackSession.close();
|
||||
senderSession.close();
|
||||
conn.close();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.Vector;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -32,41 +35,51 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
|
||||
public class JmsConsumerResetActiveListenerTest extends TestCase {
|
||||
public class JmsConsumerResetActiveListenerTest {
|
||||
|
||||
private Connection connection;
|
||||
private ActiveMQConnectionFactory factory;
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
|
||||
|
||||
@Rule
|
||||
public final TestName name = new TestName();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
|
||||
connection = factory.createConnection();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* verify the (undefined by spec) behaviour of setting a listener while receiving a message.
|
||||
*
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testSetListenerFromListener() throws Exception {
|
||||
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
Destination dest = session.createQueue("Queue-" + getName());
|
||||
Destination dest = session.createQueue("Queue-" + name.getMethodName());
|
||||
final MessageConsumer consumer = session.createConsumer(dest);
|
||||
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final AtomicBoolean first = new AtomicBoolean(true);
|
||||
final Vector<Object> results = new Vector<Object>();
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
if (first.compareAndSet(true, false)) {
|
||||
try {
|
||||
|
@ -83,14 +96,14 @@ public class JmsConsumerResetActiveListenerTest extends TestCase {
|
|||
});
|
||||
|
||||
connection.start();
|
||||
|
||||
|
||||
MessageProducer producer = session.createProducer(dest);
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
producer.send(session.createTextMessage("First"));
|
||||
producer.send(session.createTextMessage("Second"));
|
||||
|
||||
|
||||
assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
|
||||
assertEquals("we have a result", 2, results.size());
|
||||
Object result = results.get(0);
|
||||
assertTrue(result instanceof TextMessage);
|
||||
|
@ -99,22 +112,24 @@ public class JmsConsumerResetActiveListenerTest extends TestCase {
|
|||
assertTrue(result instanceof TextMessage);
|
||||
assertEquals("result is first", "Second", ((TextMessage)result).getText());
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* and a listener on a new consumer, just in case.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testNewConsumerSetListenerFromListener() throws Exception {
|
||||
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||
final Destination dest = session.createQueue("Queue-" + getName());
|
||||
final Destination dest = session.createQueue("Queue-" + name.getMethodName());
|
||||
final MessageConsumer consumer = session.createConsumer(dest);
|
||||
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final AtomicBoolean first = new AtomicBoolean(true);
|
||||
final Vector<Object> results = new Vector<Object>();
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
if (first.compareAndSet(true, false)) {
|
||||
try {
|
||||
|
@ -132,14 +147,14 @@ public class JmsConsumerResetActiveListenerTest extends TestCase {
|
|||
});
|
||||
|
||||
connection.start();
|
||||
|
||||
|
||||
MessageProducer producer = session.createProducer(dest);
|
||||
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||
producer.send(session.createTextMessage("First"));
|
||||
producer.send(session.createTextMessage("Second"));
|
||||
|
||||
|
||||
assertTrue("we did not timeout", latch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
|
||||
assertEquals("we have a result", 2, results.size());
|
||||
Object result = results.get(0);
|
||||
assertTrue(result instanceof TextMessage);
|
||||
|
|
|
@ -16,6 +16,11 @@
|
|||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
|
@ -24,76 +29,67 @@ import javax.jms.MessageProducer;
|
|||
import javax.jms.Session;
|
||||
import javax.jms.Topic;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class JmsCreateConsumerInOnMessageTest extends TestSupport implements MessageListener {
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
public class JmsCreateConsumerInOnMessageTest {
|
||||
|
||||
private Connection connection;
|
||||
private Session publisherSession;
|
||||
private Session consumerSession;
|
||||
private MessageConsumer consumer;
|
||||
private MessageConsumer testConsumer;
|
||||
private MessageProducer producer;
|
||||
private Topic topic;
|
||||
private Object lock = new Object();
|
||||
private ActiveMQConnectionFactory factory;
|
||||
|
||||
/*
|
||||
* @see junit.framework.TestCase#setUp()
|
||||
*/
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
super.topic = true;
|
||||
connection = createConnection();
|
||||
connection.setClientID("connection:" + getSubject());
|
||||
publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
topic = (Topic)super.createDestination("Test.Topic");
|
||||
consumer = consumerSession.createConsumer(topic);
|
||||
consumer.setMessageListener(this);
|
||||
producer = publisherSession.createProducer(topic);
|
||||
connection.start();
|
||||
@Rule
|
||||
public final TestName name = new TestName();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false&broker.useJmx=false");
|
||||
connection = factory.createConnection();
|
||||
}
|
||||
|
||||
/*
|
||||
* @see junit.framework.TestCase#tearDown()
|
||||
*/
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
connection.close();
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
connection.close();
|
||||
connection = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests if a consumer can be created asynchronusly
|
||||
*
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test(timeout = 60000)
|
||||
public void testCreateConsumer() throws Exception {
|
||||
Message msg = super.createMessage();
|
||||
producer.send(msg);
|
||||
if (testConsumer == null) {
|
||||
synchronized (lock) {
|
||||
lock.wait(3000);
|
||||
}
|
||||
}
|
||||
assertTrue(testConsumer != null);
|
||||
}
|
||||
final CountDownLatch done = new CountDownLatch(1);
|
||||
|
||||
/**
|
||||
* Use the asynchronous subscription mechanism
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
public void onMessage(Message message) {
|
||||
try {
|
||||
testConsumer = consumerSession.createConsumer(topic);
|
||||
consumerSession.createProducer(topic);
|
||||
synchronized (lock) {
|
||||
lock.notify();
|
||||
final Session publisherSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
final Topic topic = publisherSession.createTopic("Test.Topic");
|
||||
|
||||
MessageConsumer consumer = consumerSession.createConsumer(topic);
|
||||
consumer.setMessageListener(new MessageListener() {
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message) {
|
||||
try {
|
||||
consumerSession.createConsumer(topic);
|
||||
consumerSession.createProducer(topic);
|
||||
done.countDown();
|
||||
} catch (Exception ex) {
|
||||
assertTrue(false);
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
ex.printStackTrace();
|
||||
assertTrue(false);
|
||||
}
|
||||
});
|
||||
|
||||
MessageProducer producer = publisherSession.createProducer(topic);
|
||||
connection.start();
|
||||
|
||||
producer.send(publisherSession.createTextMessage("test"));
|
||||
|
||||
assertTrue("Should have finished onMessage", done.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue