mirror of https://github.com/apache/activemq.git
resolve hudson failure with DurableConsumerTest related to kahaDB under the FilePendingMessageCursor - fixes the cursor got duplicates error and improves the test now that there are no more duplicates. relates to https://issues.apache.org/activemq/browse/AMQ-2575
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@903741 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8b1e16de48
commit
19bfd9d3b5
|
@ -507,7 +507,9 @@ public abstract class AbstractRegion implements Region {
|
|||
Subscription sub = subscriptions.get(control.getConsumerId());
|
||||
if (sub != null && sub instanceof AbstractSubscription) {
|
||||
((AbstractSubscription)sub).setPrefetchSize(control.getPrefetch());
|
||||
LOG.info("setting prefetch: " + control.getPrefetch() + ", on subscription: " + control.getConsumerId());
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: " + control.getConsumerId());
|
||||
}
|
||||
try {
|
||||
lookup(consumerExchange.getConnectionContext(), control.getDestination()).wakeup();
|
||||
} catch (Exception e) {
|
||||
|
|
|
@ -429,7 +429,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
}
|
||||
}
|
||||
if( entry!=null ) {
|
||||
sd.subscriptionCursors.put(subscriptionKey, cursorPos+1);
|
||||
sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -400,7 +400,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
|
|||
}
|
||||
}
|
||||
if( entry!=null ) {
|
||||
sd.subscriptionCursors.put(subscriptionKey, cursorPos+1);
|
||||
sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -206,7 +206,7 @@ public class DurableConsumerTest extends CombinationTestSupport{
|
|||
final String topicName = getName();
|
||||
final int numMessages = 500;
|
||||
int numConsumers = 1;
|
||||
final CountDownLatch counsumerStarted = new CountDownLatch(0);
|
||||
final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers);
|
||||
final AtomicInteger receivedCount = new AtomicInteger();
|
||||
Runnable consumer = new Runnable(){
|
||||
public void run(){
|
||||
|
@ -232,7 +232,10 @@ public class DurableConsumerTest extends CombinationTestSupport{
|
|||
msg = consumer.receive(5000);
|
||||
if (msg != null) {
|
||||
receivedCount.incrementAndGet();
|
||||
if (received++ % 2 == 0) {
|
||||
if (received != 0 && received % 100 == 0) {
|
||||
LOG.info("Received msg: " + msg.getJMSMessageID());
|
||||
}
|
||||
if (++received % 2 == 0) {
|
||||
msg.acknowledge();
|
||||
acked++;
|
||||
}
|
||||
|
@ -277,10 +280,11 @@ public class DurableConsumerTest extends CombinationTestSupport{
|
|||
|
||||
Wait.waitFor(new Wait.Condition(){
|
||||
public boolean isSatisified() throws Exception{
|
||||
return receivedCount.get() > numMessages;
|
||||
LOG.info("receivedCount: " + receivedCount.get());
|
||||
return receivedCount.get() == numMessages;
|
||||
}
|
||||
}, 60 * 1000);
|
||||
assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() > numMessages);
|
||||
assertEquals("got required some messages", numMessages, receivedCount.get());
|
||||
assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue