Hiram R. Chirino 2006-11-20 21:40:28 +00:00
parent 0607226831
commit ecb50234ec
2 changed files with 7 additions and 8 deletions

View File

@ -877,9 +877,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
} }
} }
// yeild here so that a thread trying to stop unconsumedMessages has
// a chance of getting prioritized head of this thread that is in a dispatch loop.
Thread.yield();
} catch (Exception e) { } catch (Exception e) {
session.connection.onAsyncException(e); session.connection.onAsyncException(e);
} }

View File

@ -287,7 +287,8 @@ public class JMSConsumerTest extends JmsTestSupport {
public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws Exception { public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws Exception {
final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch done = new CountDownLatch(1); final CountDownLatch sendDone = new CountDownLatch(1);
final CountDownLatch got2Done = new CountDownLatch(1);
// Set prefetch to 1 // Set prefetch to 1
connection.getPrefetchPolicy().setAll(1); connection.getPrefetchPolicy().setAll(1);
@ -309,8 +310,9 @@ public class JMSConsumerTest extends JmsTestSupport {
counter.incrementAndGet(); counter.incrementAndGet();
m.acknowledge(); m.acknowledge();
if( counter.get()==2 ) { if( counter.get()==2 ) {
done.countDown(); sendDone.await();
Thread.sleep(500); connection.close();
got2Done.countDown();
} }
} catch (Throwable e) { } catch (Throwable e) {
e.printStackTrace(); e.printStackTrace();
@ -320,10 +322,10 @@ public class JMSConsumerTest extends JmsTestSupport {
// Send the messages // Send the messages
sendMessages(session, destination, 4); sendMessages(session, destination, 4);
sendDone.countDown();
// Wait for first 2 messages to arrive. // Wait for first 2 messages to arrive.
assertTrue(done.await(100000, TimeUnit.MILLISECONDS)); assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
connection.close();
// Re-start connection. // Re-start connection.
connection = (ActiveMQConnection) factory.createConnection(); connection = (ActiveMQConnection) factory.createConnection();