git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1241246 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-02-06 22:54:47 +00:00
parent cdba931deb
commit b567bd478b
1 changed files with 16 additions and 23 deletions

View File

@ -24,6 +24,8 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -48,7 +50,6 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
@ -57,14 +58,12 @@ import org.slf4j.LoggerFactory;
/**
* The Topic is a destination that sends a copy of a message to every active
* Subscription registered.
*
*
*/
public class Topic extends BaseDestination implements Task {
protected static final Logger LOG = LoggerFactory.getLogger(Topic.class);
private final TopicMessageStore topicStore;
protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
private final Valve dispatchValve = new Valve(true);
private final ReentrantReadWriteLock dispatchLock = new ReentrantReadWriteLock();
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
@ -118,21 +117,17 @@ public class Topic extends BaseDestination implements Task {
// Do a retroactive recovery if needed.
if (sub.getConsumerInfo().isRetroactive() || isAlwaysRetroactive()) {
// synchronize with dispatch method so that no new messages are
// sent
// while we are recovering a subscription to avoid out of order
// messages.
dispatchValve.turnOff();
// synchronize with dispatch method so that no new messages are sent
// while we are recovering a subscription to avoid out of order messages.
dispatchLock.writeLock().lock();
try {
synchronized (consumers) {
sub.add(context, this);
consumers.add(sub);
}
subscriptionRecoveryPolicy.recover(context, this, sub);
} finally {
dispatchValve.turnOn();
dispatchLock.writeLock().unlock();
}
} else {
@ -174,9 +169,8 @@ public class Topic extends BaseDestination implements Task {
public void activate(ConnectionContext context, final DurableTopicSubscription subscription) throws Exception {
// synchronize with dispatch method so that no new messages are sent
// while
// we are recovering a subscription to avoid out of order messages.
dispatchValve.turnOff();
// while we are recovering a subscription to avoid out of order messages.
dispatchLock.writeLock().lock();
try {
if (topicStore == null) {
@ -201,6 +195,7 @@ public class Topic extends BaseDestination implements Task {
}
}
}
// Do we need to create the subscription?
if (info == null) {
info = new SubscriptionInfo();
@ -248,7 +243,7 @@ public class Topic extends BaseDestination implements Task {
});
}
} finally {
dispatchValve.turnOn();
dispatchLock.writeLock().unlock();
}
}
@ -466,6 +461,7 @@ public class Topic extends BaseDestination implements Task {
message.decrementReferenceCount();
}
}
if (result != null && !result.isCancelled()) {
try {
result.get();
@ -474,7 +470,6 @@ public class Topic extends BaseDestination implements Task {
// has already been deleted
}
}
}
private boolean canOptimizeOutPersistence() {
@ -512,7 +507,6 @@ public class Topic extends BaseDestination implements Task {
if (getExpireMessagesPeriod() > 0) {
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
}
public void stop() throws Exception {
@ -634,8 +628,9 @@ public class Topic extends BaseDestination implements Task {
// misleading metrics.
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
dispatchValve.increment();
MessageEvaluationContext msgContext = null;
dispatchLock.readLock().lock();
try {
if (!subscriptionRecoveryPolicy.add(context, message)) {
return;
@ -654,7 +649,7 @@ public class Topic extends BaseDestination implements Task {
}
} finally {
dispatchValve.decrement();
dispatchLock.readLock().unlock();
if (msgContext != null) {
msgContext.clear();
}
@ -693,6 +688,4 @@ public class Topic extends BaseDestination implements Task {
protected Logger getLog() {
return LOG;
}
}