Ensure that connections get closed to allow the vm transport resources

to get cleaned up.
This commit is contained in:
Timothy Bish 2016-06-07 09:29:41 -04:00
parent 9079678c3a
commit 5fd307cd9f
3 changed files with 24 additions and 8 deletions

View File

@ -82,7 +82,7 @@ public class JmsCronSchedulerTest extends JobSchedulerTestSupport {
producer.send(message); producer.send(message);
LOG.info("Message {} sent at {}", i, new Date().toString()); LOG.info("Message {} sent at {}", i, new Date().toString());
producer.close(); producer.close();
//wait a couple sec so cron start time is different for next message // wait a couple sec so cron start time is different for next message
Thread.sleep(2000); Thread.sleep(2000);
} }
SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class); SchedulerBroker sb = (SchedulerBroker) this.broker.getBroker().getAdaptor(SchedulerBroker.class);
@ -90,8 +90,10 @@ public class JmsCronSchedulerTest extends JobSchedulerTestSupport {
List<Job> list = js.getAllJobs(); List<Job> list = js.getAllJobs();
assertEquals(COUNT, list.size()); assertEquals(COUNT, list.size());
latch.await(2, TimeUnit.MINUTES); latch.await(2, TimeUnit.MINUTES);
//All should messages should have been received by now // All should messages should have been received by now
assertEquals(COUNT, count.get()); assertEquals(COUNT, count.get());
connection.close();
} }
@Test @Test
@ -114,5 +116,7 @@ public class JmsCronSchedulerTest extends JobSchedulerTestSupport {
assertNotNull(consumer.receiveNoWait()); assertNotNull(consumer.receiveNoWait());
assertNull(consumer.receiveNoWait()); assertNull(consumer.receiveNoWait());
connection.close();
} }
} }

View File

@ -220,7 +220,6 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
broker.start(); broker.start();
broker.waitUntilStarted(); broker.waitUntilStarted();
// consume the message // consume the message
connection = createConnection(); connection = createConnection();
connection.start(); connection.start();
@ -244,9 +243,9 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024); broker.getSystemUsage().getJobSchedulerUsage().setLimit(10 * 1024);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection conn = factory.createConnection(); Connection connection = factory.createConnection();
conn.start(); connection.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sess = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final long time = 5000; final long time = 5000;
final ProducerThread producer = new ProducerThread(sess, destination) { final ProducerThread producer = new ProducerThread(sess, destination) {
@Override @Override
@ -295,6 +294,6 @@ public class JmsSchedulerTest extends JobSchedulerTestSupport {
latch.await(20000l, TimeUnit.MILLISECONDS); latch.await(20000l, TimeUnit.MILLISECONDS);
assertEquals("Consumer did not receive all messages.", 0, latch.getCount()); assertEquals("Consumer did not receive all messages.", 0, latch.getCount());
conn.close(); connection.close();
} }
} }

View File

@ -78,6 +78,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
// Now wait and see if any get delivered, none should. // Now wait and see if any get delivered, none should.
latch.await(10, TimeUnit.SECONDS); latch.await(10, TimeUnit.SECONDS);
assertEquals(latch.getCount(), COUNT); assertEquals(latch.getCount(), COUNT);
connection.close();
} }
@Test @Test
@ -144,6 +146,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
// Now wait and see if any get delivered, none should. // Now wait and see if any get delivered, none should.
latch.await(10, TimeUnit.SECONDS); latch.await(10, TimeUnit.SECONDS);
assertEquals(2, latch.getCount()); assertEquals(2, latch.getCount());
connection.close();
} }
@Test @Test
@ -203,6 +207,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
// now check that they all got delivered // now check that they all got delivered
latch.await(10, TimeUnit.SECONDS); latch.await(10, TimeUnit.SECONDS);
assertEquals(latch.getCount(), 0); assertEquals(latch.getCount(), 0);
connection.close();
} }
@Test @Test
@ -270,6 +276,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
// destination. // destination.
latch.await(20, TimeUnit.SECONDS); latch.await(20, TimeUnit.SECONDS);
assertEquals(0, latch.getCount()); assertEquals(0, latch.getCount());
connection.close();
} }
@Test @Test
@ -327,6 +335,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
// now check that they all got removed and are not delivered. // now check that they all got removed and are not delivered.
latch.await(11, TimeUnit.SECONDS); latch.await(11, TimeUnit.SECONDS);
assertEquals(COUNT, latch.getCount()); assertEquals(COUNT, latch.getCount());
connection.close();
} }
@Test @Test
@ -341,7 +351,6 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
MessageProducer producer = session.createProducer(management); MessageProducer producer = session.createProducer(management);
try { try {
// Send the remove request // Send the remove request
Message remove = session.createMessage(); Message remove = session.createMessage();
remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL); remove.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
@ -349,6 +358,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
producer.send(remove); producer.send(remove);
} catch (Exception e) { } catch (Exception e) {
fail("Caught unexpected exception during remove of unscheduled message."); fail("Caught unexpected exception during remove of unscheduled message.");
} finally {
connection.close();
} }
} }
@ -388,6 +399,8 @@ public class JobSchedulerManagementTest extends JobSchedulerTestSupport {
// Now check if there are anymore, there shouldn't be // Now check if there are anymore, there shouldn't be
message = browser.receive(5000); message = browser.receive(5000);
assertNull(message); assertNull(message);
connection.close();
} }
protected void scheduleMessage(Connection connection, long delay) throws Exception { protected void scheduleMessage(Connection connection, long delay) throws Exception {