git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389614 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-28 22:10:34 +00:00
parent 02d5f187ed
commit 449981be9d
5 changed files with 80 additions and 37 deletions

View File

@ -116,6 +116,8 @@ public class BrokerService implements Service {
private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean started = new AtomicBoolean(false);
private BrokerPlugin[] plugins; private BrokerPlugin[] plugins;
private boolean keepDurableSubsActive;
/** /**
* Adds a new transport connector for the given bind address * Adds a new transport connector for the given bind address
* *
@ -908,6 +910,7 @@ public class BrokerService implements Service {
else { else {
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter()); regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), getPersistenceAdapter());
} }
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName()); regionBroker.setBrokerName(getBrokerName());
return regionBroker; return regionBroker;
} }
@ -1121,4 +1124,12 @@ public class BrokerService implements Service {
public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){ public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure){
this.shutdownOnMasterFailure=shutdownOnMasterFailure; this.shutdownOnMasterFailure=shutdownOnMasterFailure;
} }
public boolean isKeepDurableSubsActive() {
return keepDurableSubsActive;
}
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
} }

View File

@ -36,10 +36,12 @@ public class DurableTopicSubscription extends PrefetchSubscription {
private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap(); private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap();
private final ConcurrentHashMap destinations = new ConcurrentHashMap(); private final ConcurrentHashMap destinations = new ConcurrentHashMap();
private final SubscriptionKey subscriptionKey; private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private boolean active=false; private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
super(broker,context, info); super(broker,context, info);
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
} }
@ -57,10 +59,13 @@ public class DurableTopicSubscription extends PrefetchSubscription {
synchronized public void add(ConnectionContext context, Destination destination) throws Exception { synchronized public void add(ConnectionContext context, Destination destination) throws Exception {
super.add(context, destination); super.add(context, destination);
destinations.put(destination.getActiveMQDestination(), destination); destinations.put(destination.getActiveMQDestination(), destination);
if( active ) { if( active || keepDurableSubsActive ) {
Topic topic = (Topic) destination; Topic topic = (Topic) destination;
topic.activate(context, this); topic.activate(context, this);
} }
if( !isFull() ) {
dispatchMatched();
}
} }
synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception { synchronized public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
@ -68,22 +73,26 @@ public class DurableTopicSubscription extends PrefetchSubscription {
this.active = true; this.active = true;
this.context = context; this.context = context;
this.info = info; this.info = info;
if( !keepDurableSubsActive ) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next(); Topic topic = (Topic) iter.next();
topic.activate(context, this); topic.activate(context, this);
} }
}
if( !isFull() ) { if( !isFull() ) {
dispatchMatched(); dispatchMatched();
} }
} }
} }
synchronized public void deactivate() throws Exception { synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception {
active=false; active=false;
if( !keepDurableSubsActive ) {
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next(); Topic topic = (Topic) iter.next();
topic.deactivate(context, this); topic.deactivate(context, this);
} }
}
for (Iterator iter = dispatched.iterator(); iter.hasNext();) { for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
// Mark the dispatched messages as redelivered for next time. // Mark the dispatched messages as redelivered for next time.
@ -115,7 +124,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
} }
synchronized public void add(MessageReference node) throws Exception { synchronized public void add(MessageReference node) throws Exception {
if( !active ) { if( !active && !keepDurableSubsActive ) {
return; return;
} }
node = new IndirectMessageReference(node.getRegionDestination(), (Message) node); node = new IndirectMessageReference(node.getRegionDestination(), (Message) node);
@ -124,7 +133,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
} }
public int getPendingQueueSize() { public int getPendingQueueSize() {
if (active){ if( active || keepDurableSubsActive ) {
return super.getPendingQueueSize(); return super.getPendingQueueSize();
} }
//TODO: need to get from store //TODO: need to get from store

View File

@ -71,6 +71,7 @@ public class RegionBroker implements Broker {
private final Region tempTopicRegion; private final Region tempTopicRegion;
private BrokerService brokerService; private BrokerService brokerService;
private boolean stopped = false; private boolean stopped = false;
private boolean keepDurableSubsActive=false;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
@ -125,6 +126,7 @@ public class RegionBroker implements Broker {
public void start() throws Exception { public void start() throws Exception {
((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
queueRegion.start(); queueRegion.start();
topicRegion.start(); topicRegion.start();
tempQueueRegion.start(); tempQueueRegion.start();
@ -479,5 +481,13 @@ public class RegionBroker implements Broker {
ss.stop(tempTopicRegion); ss.stop(tempTopicRegion);
} }
public boolean isKeepDurableSubsActive() {
return keepDurableSubsActive;
}
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
} }

View File

@ -128,12 +128,6 @@ public class Topic implements Destination {
sub.remove(context, this); sub.remove(context, this);
} }
public void addInactiveSubscription(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
sub.add(context, this);
destinationStatistics.getConsumers().increment();
durableSubcribers.put(sub.getSubscriptionKey(), sub);
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException { public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
if (store != null) { if (store != null) {
store.deleteSubscription(key.clientId, key.subscriptionName); store.deleteSubscription(key.clientId, key.subscriptionName);

View File

@ -16,30 +16,32 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import java.util.Iterator;
import java.util.Set;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidDestinationException; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Set;
/** /**
* *
@ -47,9 +49,10 @@ import java.util.Set;
*/ */
public class TopicRegion extends AbstractRegion { public class TopicRegion extends AbstractRegion {
private static final Log log = LogFactory.getLog(TopicRegion.class); private static final Log log = LogFactory.getLog(TopicRegion.class);
protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap(); protected final ConcurrentHashMap durableSubscriptions = new ConcurrentHashMap();
private final LongSequenceGenerator recoveredDurableSubIdGenerator = new LongSequenceGenerator();
private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
private boolean keepDurableSubsActive=false;
public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, public TopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter persistenceAdapter) { PersistenceAdapter persistenceAdapter) {
@ -116,7 +119,7 @@ public class TopicRegion extends AbstractRegion {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if (sub != null) { if (sub != null) {
sub.deactivate(); sub.deactivate(keepDurableSubsActive);
} }
} }
@ -166,24 +169,32 @@ public class TopicRegion extends AbstractRegion {
// A single durable sub may be subscribing to multiple topics. so it might exist already. // A single durable sub may be subscribing to multiple topics. so it might exist already.
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
if( sub == null ) { if( sub == null ) {
sub = (DurableTopicSubscription) createSubscription(context, createInactiveConsumerInfo(info)); sub = (DurableTopicSubscription) createSubscription(context, consumerInfo );
} }
topic.addInactiveSubscription(context, sub);
subscriptions.put(consumerInfo.getConsumerId(), sub);
topic.addSubscription(context, sub);
} }
} }
return topic; return topic;
} }
private static ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) { private ConsumerInfo createInactiveConsumerInfo(SubscriptionInfo info) {
ConsumerInfo rc = new ConsumerInfo(); ConsumerInfo rc = new ConsumerInfo();
rc.setSelector(info.getSelector()); rc.setSelector(info.getSelector());
rc.setSubcriptionName(info.getSubcriptionName()); rc.setSubcriptionName(info.getSubcriptionName());
rc.setDestination(info.getDestination()); rc.setDestination(info.getDestination());
rc.setConsumerId(createConsumerId());
return rc; return rc;
} }
private ConsumerId createConsumerId() {
return new ConsumerId(recoveredDurableSubSessionId,recoveredDurableSubIdGenerator.getNextSequenceId());
}
protected void configureTopic(Topic topic, ActiveMQDestination destination) { protected void configureTopic(Topic topic, ActiveMQDestination destination) {
if (broker.getDestinationPolicy() != null) { if (broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
@ -198,7 +209,7 @@ public class TopicRegion extends AbstractRegion {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
if (sub == null) { if (sub == null) {
sub = new DurableTopicSubscription(broker,context, info); sub = new DurableTopicSubscription(broker,context, info, keepDurableSubsActive);
durableSubscriptions.put(key, sub); durableSubscriptions.put(key, sub);
} }
else { else {
@ -241,4 +252,12 @@ public class TopicRegion extends AbstractRegion {
return inactiveDestinations; return inactiveDestinations;
} }
public boolean isKeepDurableSubsActive() {
return keepDurableSubsActive;
}
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
this.keepDurableSubsActive = keepDurableSubsActive;
}
} }