git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1404732 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-11-01 19:44:37 +00:00
parent 76a9c30be6
commit 566039fa5b
3 changed files with 23 additions and 6 deletions

View File

@ -72,6 +72,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return offlineTimestamp.get(); return offlineTimestamp.get();
} }
public void setOfflineTimestamp(long timestamp) {
offlineTimestamp.set(timestamp);
}
public boolean isFull() { public boolean isFull() {
return !active.get() || super.isFull(); return !active.get() || super.isFull();
} }
@ -139,7 +143,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
} }
} }
LOG.debug("Activating " + this); if (LOG.isDebugEnabled()) {
LOG.debug("Activating " + this);
}
if (!keepDurableSubsActive) { if (!keepDurableSubsActive) {
for (Destination destination : durableDestinations.values()) { for (Destination destination : durableDestinations.values()) {
Topic topic = (Topic) destination; Topic topic = (Topic) destination;
@ -170,7 +176,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
} }
public void deactivate(boolean keepDurableSubsActive) throws Exception { public void deactivate(boolean keepDurableSubsActive) throws Exception {
LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this); if (LOG.isDebugEnabled()) {
LOG.debug("Deactivating keepActive=" + keepDurableSubsActive + ", " + this);
}
active.set(false); active.set(false);
offlineTimestamp.set(System.currentTimeMillis()); offlineTimestamp.set(System.currentTimeMillis());
this.usageManager.getMemoryUsage().removeUsageListener(this); this.usageManager.getMemoryUsage().removeUsageListener(this);

View File

@ -86,7 +86,7 @@ public class TopicRegion extends AbstractRegion {
for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) { for (Map.Entry<SubscriptionKey, DurableTopicSubscription> entry : durableSubscriptions.entrySet()) {
DurableTopicSubscription sub = entry.getValue(); DurableTopicSubscription sub = entry.getValue();
if (!sub.isActive()) { if (!sub.isActive()) {
long offline = sub.getOfflineTimestamp(); long offline = sub.getOfflineTimestamp();
if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) { if (offline != -1 && now - offline >= broker.getBrokerService().getOfflineDurableSubscriberTimeout()) {
LOG.info("Destroying durable subscriber due to inactivity: " + sub); LOG.info("Destroying durable subscriber due to inactivity: " + sub);
try { try {
@ -243,6 +243,7 @@ public class TopicRegion extends AbstractRegion {
c.setClientId(key.getClientId()); c.setClientId(key.getClientId());
c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId()); c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
sub = (DurableTopicSubscription)createSubscription(c, consumerInfo); sub = (DurableTopicSubscription)createSubscription(c, consumerInfo);
sub.setOfflineTimestamp(System.currentTimeMillis());
} }
if (dupChecker.contains(sub)) { if (dupChecker.contains(sub)) {
@ -258,7 +259,7 @@ public class TopicRegion extends AbstractRegion {
// that would match this destination.. // that would match this destination..
durableSubscriptions.values(); durableSubscriptions.values();
for (DurableTopicSubscription sub : durableSubscriptions.values()) { for (DurableTopicSubscription sub : durableSubscriptions.values()) {
// Skip over subscriptions that we allready added.. // Skip over subscriptions that we already added..
if (dupChecker.contains(sub)) { if (dupChecker.contains(sub)) {
continue; continue;
} }

View File

@ -26,9 +26,13 @@ import junit.framework.Test;
import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DurableSubscriptionRemoveOfflineTest extends EmbeddedBrokerTestSupport { public class DurableSubscriptionRemoveOfflineTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(DurableSubscriptionRemoveOfflineTest.class);
protected void setUp() throws Exception { protected void setUp() throws Exception {
useTopic = true; useTopic = true;
super.setUp(); super.setUp();
@ -88,16 +92,20 @@ public class DurableSubscriptionRemoveOfflineTest extends EmbeddedBrokerTestSupp
subscriber.close(); subscriber.close();
connection.close(); connection.close();
LOG.info("Broker restarting, wait for inactive cleanup afterwards.");
restartBroker(); restartBroker();
LOG.info("Broker restarted, wait for inactive cleanup now.");
assertTrue(broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1); assertTrue(broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1);
Wait.waitFor(new Wait.Condition() { assertTrue(Wait.waitFor(new Wait.Condition() {
@Override @Override
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0; return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 0;
} }
}, 20000); }, 20000));
} }
protected boolean isPersistent() { protected boolean isPersistent() {