diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 13bf74aabe..3dbed3a10e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -507,7 +507,9 @@ public abstract class AbstractRegion implements Region { Subscription sub = subscriptions.get(control.getConsumerId()); if (sub != null && sub instanceof AbstractSubscription) { ((AbstractSubscription)sub).setPrefetchSize(control.getPrefetch()); - LOG.info("setting prefetch: " + control.getPrefetch() + ", on subscription: " + control.getConsumerId()); + if (LOG.isDebugEnabled()) { + LOG.debug("setting prefetch: " + control.getPrefetch() + ", on subscription: " + control.getConsumerId()); + } try { lookup(consumerExchange.getConnectionContext(), control.getDestination()).wakeup(); } catch (Exception e) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 1dddef7963..2fc149ec54 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -429,7 +429,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } if( entry!=null ) { - sd.subscriptionCursors.put(subscriptionKey, cursorPos+1); + sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); } } }); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java index 8a0353f263..c19359febc 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/TempKahaDBStore.java @@ -400,7 +400,7 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA } } if( entry!=null ) { - sd.subscriptionCursors.put(subscriptionKey, cursorPos+1); + sd.subscriptionCursors.put(subscriptionKey, entry.getKey() + 1); } } }); diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java index bd9d629727..37ead174fa 100755 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/DurableConsumerTest.java @@ -206,7 +206,7 @@ public class DurableConsumerTest extends CombinationTestSupport{ final String topicName = getName(); final int numMessages = 500; int numConsumers = 1; - final CountDownLatch counsumerStarted = new CountDownLatch(0); + final CountDownLatch counsumerStarted = new CountDownLatch(numConsumers); final AtomicInteger receivedCount = new AtomicInteger(); Runnable consumer = new Runnable(){ public void run(){ @@ -232,7 +232,10 @@ public class DurableConsumerTest extends CombinationTestSupport{ msg = consumer.receive(5000); if (msg != null) { receivedCount.incrementAndGet(); - if (received++ % 2 == 0) { + if (received != 0 && received % 100 == 0) { + LOG.info("Received msg: " + msg.getJMSMessageID()); + } + if (++received % 2 == 0) { msg.acknowledge(); acked++; } @@ -277,10 +280,11 @@ public class DurableConsumerTest extends CombinationTestSupport{ Wait.waitFor(new Wait.Condition(){ public boolean isSatisified() throws Exception{ - return receivedCount.get() > numMessages; + LOG.info("receivedCount: " + receivedCount.get()); + return receivedCount.get() == numMessages; } }, 60 * 1000); - assertTrue("got some messages: " + receivedCount.get(), receivedCount.get() > numMessages); + assertEquals("got required some messages", numMessages, receivedCount.get()); assertTrue("no exceptions, but: " + exceptions, exceptions.isEmpty()); }