Fixing a few intermittent failures

These tests were not as broken as on 2.3.x from the old hornetq branch where this fix originated.
However I will play safe here as I believe the race could be after the exception is raised and before the counter was added
(on ActiveMQMessageHandlerTest)
This commit is contained in:
Clebert Suconic 2015-01-21 18:38:27 -05:00
parent 0eb1e332d3
commit 4ebacc9d87
2 changed files with 39 additions and 21 deletions

View File

@ -134,16 +134,14 @@ public class ConsumerStuckTest extends ServiceTestBase
long timeout = System.currentTimeMillis() + 20000;
while (System.currentTimeMillis() < timeout && server.getSessions().size() != 0)
long timeStart = System.currentTimeMillis();
while (timeout > System.currentTimeMillis() && server.getSessions().size() != 0 && server.getConnectionCount() != 0)
{
Thread.sleep(10);
}
System.out.println("Size = " + server.getConnectionCount());
System.out.println("sessions = " + server.getSessions().size());
System.out.println("Time = " + System.currentTimeMillis() + " time diff = " + (System.currentTimeMillis() - timeStart) + ", connections Size = " + server.getConnectionCount() + " sessions = " + server.getSessions().size());
if (server.getSessions().size() != 0)
{
@ -151,14 +149,16 @@ public class ConsumerStuckTest extends ServiceTestBase
fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
}
System.out.println("Size = " + server.getConnectionCount());
timeout = System.currentTimeMillis() + 20000;
System.out.println("sessions = " + server.getSessions().size());
while (System.currentTimeMillis() < timeout && server.getConnectionCount() != 0)
if (server.getSessions().size() != 0)
{
Thread.sleep(10);
System.out.println(threadDump("Thread dump"));
fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
}
assertEquals(0, server.getConnectionCount());
}
finally

View File

@ -99,7 +99,7 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
spec.setDestination(MDBQUEUE);
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(15);
MultipleEndpoints endpoint = new MultipleEndpoints(latch, false);
MultipleEndpoints endpoint = new MultipleEndpoints(latch, null, false);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
@ -132,7 +132,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
spec.setDestination(MDBQUEUE);
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(SIZE);
MultipleEndpoints endpoint = new MultipleEndpoints(latch, true);
CountDownLatch latchDone = new CountDownLatch(SIZE);
MultipleEndpoints endpoint = new MultipleEndpoints(latch, latchDone, true);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
@ -148,6 +149,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
latchDone.await(5, TimeUnit.SECONDS);
assertEquals(SIZE, endpoint.messages.intValue());
assertEquals(0, endpoint.interrupted.intValue());
@ -169,7 +172,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
spec.setDestination(MDBQUEUE);
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(SIZE);
MultipleEndpoints endpoint = new MultipleEndpoints(latch, true);
CountDownLatch latchDone = new CountDownLatch(SIZE);
MultipleEndpoints endpoint = new MultipleEndpoints(latch, latchDone, true);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
@ -185,6 +189,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
latchDone.await(5, TimeUnit.SECONDS);
assertEquals(SIZE, endpoint.messages.intValue());
//half onmessage interrupted
assertEquals(SIZE / 2, endpoint.interrupted.intValue());
@ -856,14 +862,16 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
class MultipleEndpoints extends DummyMessageEndpoint
{
private final CountDownLatch latch;
private final CountDownLatch latchDone;
private final boolean pause;
AtomicInteger messages = new AtomicInteger(0);
AtomicInteger interrupted = new AtomicInteger(0);
public MultipleEndpoints(CountDownLatch latch, boolean pause)
public MultipleEndpoints(CountDownLatch latch, CountDownLatch latchDone, boolean pause)
{
super(latch);
this.latch = latch;
this.latchDone = latchDone;
this.pause = pause;
}
@ -887,18 +895,28 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
@Override
public void onMessage(Message message)
{
try
{
latch.countDown();
if (pause && messages.getAndIncrement() % 2 == 0)
{
try
{
IntegrationTestLogger.LOGGER.info("pausing for 2 secs");
System.out.println("pausing for 2 secs");
Thread.sleep(2000);
}
catch (InterruptedException e)
{
interrupted.getAndIncrement();
interrupted.incrementAndGet();
}
}
}
finally
{
if (latchDone != null)
{
latchDone.countDown();
}
}
}