https://issues.apache.org/jira/browse/AMQ-4656 - first stab at improving keepDurableSubsActive feature, by not stoping/starting cursor on subscription (de)activating

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1511333 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2013-08-07 14:36:40 +00:00
parent 45c399d810
commit a7533ba922
7 changed files with 33 additions and 17 deletions

View File

@ -162,17 +162,19 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
synchronized (pendingLock) {
pending.setSystemUsage(memoryManager);
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
pending.setMaxAuditDepth(getMaxAuditDepth());
pending.setMaxProducersToAudit(getMaxProducersToAudit());
pending.start();
// If nothing was in the persistent store, then try to use the
// recovery policy.
if (pending.isEmpty()) {
for (Destination destination : durableDestinations.values()) {
Topic topic = (Topic) destination;
topic.recoverRetroactiveMessages(context, this);
if (!((StoreDurableSubscriberCursor) pending).isStarted() || !keepDurableSubsActive) {
pending.setSystemUsage(memoryManager);
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
pending.setMaxAuditDepth(getMaxAuditDepth());
pending.setMaxProducersToAudit(getMaxProducersToAudit());
pending.start();
// If nothing was in the persistent store, then try to use the
// recovery policy.
if (pending.isEmpty()) {
for (Destination destination : durableDestinations.values()) {
Topic topic = (Topic) destination;
topic.recoverRetroactiveMessages(context, this);
}
}
}
}
@ -195,7 +197,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
List<MessageReference> savedDispateched = null;
synchronized (pendingLock) {
pending.stop();
if (!keepDurableSubsActive) {
pending.stop();
}
synchronized (dispatchLock) {
for (Destination destination : durableDestinations.values()) {

View File

@ -309,7 +309,7 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
}
}
protected synchronized boolean isStarted() {
public synchronized boolean isStarted() {
return started;
}

View File

@ -211,7 +211,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
public final synchronized void addMessageFirst(MessageReference node) throws Exception {
public synchronized void addMessageFirst(MessageReference node) throws Exception {
setCacheEnabled(false);
size++;
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.region.cursors;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
@ -59,6 +60,11 @@ class TopicStorePrefetch extends AbstractStoreCursor {
// shouldn't get called
throw new RuntimeException("Not supported");
}
public synchronized void addMessageFirst(MessageReference node) throws Exception {
batchList.addMessageFirst(node);
size++;
}
@Override

View File

@ -213,6 +213,7 @@ public class DurableConsumerTest extends CombinationTestSupport{
public void testConcurrentDurableConsumer() throws Exception{
broker.start();
broker.waitUntilStarted();
factory = createConnectionFactory();
final String topicName = getName();
@ -253,7 +254,7 @@ public class DurableConsumerTest extends CombinationTestSupport{
}
}
} while (msg == null);
consumerConnection.close();
}
assertTrue(received >= acked);
@ -408,6 +409,7 @@ public class DurableConsumerTest extends CombinationTestSupport{
super.tearDown();
if (broker != null) {
broker.stop();
broker.waitUntilStopped();
broker = null;
}
}

View File

@ -225,8 +225,10 @@ public class DurableSubSelectorDelayWithRestartTest {
} while (true);
} finally {
sess.close();
con.close();
try {
sess.close();
con.close();
} catch (Exception e) {}
LOG.info(toString() + " OFFLINE.");
}

View File

@ -130,6 +130,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setJournalMaxFileLength(journalMaxFileLength);
}
broker.start();
broker.waitUntilStarted();
}
private void destroyBroker() throws Exception {
@ -1120,6 +1121,7 @@ public class DurableSubscriptionOfflineTest extends org.apache.activemq.TestSupp
LOG.info("Iteration: " + i);
doTestOrderOnActivateDeactivate();
broker.stop();
broker.waitUntilStopped();
createBroker(true /*deleteAllMessages*/);
}
}