Add additional tests for connection loss exception handling.

This commit is contained in:
Timothy Bish 2014-01-30 15:24:19 -05:00
parent bcd89e8d15
commit 69e6ad8cf3
1 changed files with 171 additions and 15 deletions

View File

@ -239,9 +239,76 @@ public class JMSClientTest extends AmqpTestSupport {
connection.close();
}
//should through exception IllegalStateException:The session is closed
@Test(timeout=30000)
public void testBrokerRestartPersistentQueueException() throws Exception {
public void testProducerThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("Sample text");
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
}
});
stopper.start();
try {
for (int i = 0; i < 10; ++i) {
producer.send(m);
TimeUnit.SECONDS.sleep(1);
}
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on send: {}", ex);
}
}
@Test(timeout=30000)
public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
}
});
stopper.start();
try {
for (int i = 0; i < 10; ++i) {
MessageProducer producer = session.createProducer(queue);
assertNotNull(producer);
TimeUnit.SECONDS.sleep(1);
}
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on create producer: {}", ex);
}
}
@Test(timeout=30000)
public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@ -254,8 +321,7 @@ public class JMSClientTest extends AmqpTestSupport {
Message m = session.createTextMessage("Sample text");
producer.send(m);
restartBroker();
stopBroker();
try {
session.createConsumer(queue);
fail("Should have thrown an IllegalStateException");
@ -265,38 +331,94 @@ public class JMSClientTest extends AmqpTestSupport {
}
@Test(timeout=30000)
public void testProducerThrowsWhenBrokerRestarted() throws Exception {
public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("Sample text");
Thread restart = new Thread(new Runnable() {
MessageConsumer consumer=session.createConsumer(queue);
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
restartBroker();
stopBroker();
} catch (Exception ex) {}
}
});
restart.start();
stopper.start();
try {
for (int i = 0; i < 10; ++i) {
producer.send(m);
consumer.receiveNoWait();
TimeUnit.SECONDS.sleep(1);
}
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on send: {}", ex);
LOG.info("Caught exception on receiveNoWait: {}", ex);
}
}
@Test(timeout=30000)
public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageConsumer consumer=session.createConsumer(queue);
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
}
});
stopper.start();
try {
for (int i = 0; i < 10; ++i) {
consumer.receive(1000);
}
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on receive(1000): {}", ex);
}
}
@Test(timeout=30000)
public void testConsumerReceiveReturnsBrokerStops() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
MessageConsumer consumer=session.createConsumer(queue);
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
}
});
stopper.start();
try {
Message m = consumer.receive();
assertNull(m);
} catch (Exception ex) {
LOG.info("Caught exception on receive(1000): {}", ex);
}
}
@ -538,6 +660,40 @@ public class JMSClientTest extends AmqpTestSupport {
}));
}
@Test(timeout=30000)
public void testExecptionListenerCalledOnBrokerStop() throws Exception {
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(name.toString());
connection.start();
final CountDownLatch called = new CountDownLatch(1);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.info("Exception listener called: ", exception);
called.countDown();
}
});
Thread stopper = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
stopBroker();
} catch (Exception ex) {}
}
});
stopper.start();
assertTrue("No exception listener event fired.", called.await(15, TimeUnit.SECONDS));
}
private Connection createConnection() throws JMSException {
return createConnection(name.toString(), false);
}