fix for breakage caused by earlier fix to: https://issues.apache.org/jira/browse/AMQ-4062

Only reconfigure when keepDurableSubsActive=false

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1409489 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-11-14 22:43:15 +00:00
parent cf70ce97ee
commit 21b2f84399
2 changed files with 29 additions and 11 deletions

View File

@ -51,8 +51,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private AtomicBoolean active = new AtomicBoolean();
private AtomicLong offlineTimestamp = new AtomicLong(-1);
private final AtomicBoolean active = new AtomicBoolean();
private final AtomicLong offlineTimestamp = new AtomicLong(-1);
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
@ -76,10 +76,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
offlineTimestamp.set(timestamp);
}
@Override
public boolean isFull() {
return !active.get() || super.isFull();
}
@Override
public void gc() {
}
@ -87,6 +89,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
* store will have a pending ack for all durables, irrespective of the
* selector so we need to ack if node is un-matched
*/
@Override
public void unmatched(MessageReference node) throws IOException {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
@ -100,6 +103,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
// statically configured via maxPageSize
}
@Override
public void add(ConnectionContext context, Destination destination) throws Exception {
if (!destinations.contains(destination)) {
super.add(context, destination);
@ -135,15 +139,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
this.context = context;
this.info = info;
// On Activation we should update the configuration based on our new consumer info.
ActiveMQDestination dest = this.info.getDestination();
if (dest != null && regionBroker.getDestinationPolicy() != null) {
PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
if (entry != null) {
entry.configure(broker, usageManager, this);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Activating " + this);
}
@ -153,7 +148,17 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
add(context, topic);
topic.activate(context, this);
}
// On Activation we should update the configuration based on our new consumer info.
ActiveMQDestination dest = this.info.getDestination();
if (dest != null && regionBroker.getDestinationPolicy() != null) {
PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
if (entry != null) {
entry.configure(broker, usageManager, this);
}
}
}
synchronized (pendingLock) {
pending.setSystemUsage(memoryManager);
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
@ -234,6 +239,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
prefetchExtension.set(0);
}
@Override
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
if (node != QueueMessageReference.NULL_MESSAGE) {
@ -245,6 +251,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return md;
}
@Override
public void add(MessageReference node) throws Exception {
if (!active.get() && !keepDurableSubsActive) {
return;
@ -252,6 +259,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
super.add(node);
}
@Override
protected void dispatchPending() throws IOException {
if (isActive()) {
super.dispatchPending();
@ -262,12 +270,14 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
pending.remove(node);
}
@Override
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
synchronized (pending) {
pending.addRecoveredMessage(message);
}
}
@Override
public int getPendingQueueSize() {
if (active.get() || keepDurableSubsActive) {
return super.getPendingQueueSize();
@ -276,14 +286,17 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
return 0;
}
@Override
public void setSelector(String selector) throws InvalidSelectorException {
throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
}
@Override
protected boolean canDispatch(MessageReference node) {
return isActive();
}
@Override
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.acknowledge(context, this, ack, node);
@ -291,6 +304,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
node.decrementReferenceCount();
}
@Override
public synchronized String toString() {
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
+ durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter
@ -304,6 +318,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
/**
* Release any references that we are holding.
*/
@Override
public void destroy() {
synchronized (pendingLock) {
try {
@ -327,6 +342,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
setSlowConsumer(false);
}
@Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
try {
@ -337,6 +353,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
@Override
protected boolean isDropped(MessageReference node) {
return false;
}

View File

@ -96,6 +96,7 @@ public class AMQ4062Test {
service=new BrokerService();
service.setPersistent(true);
service.setUseJmx(false);
service.setKeepDurableSubsActive(false);
KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter();
File dataFile=new File("createData");