Prevent multiple start / stop operations on a queue from having any
effect.
This commit is contained in:
Timothy Bish 2014-01-20 10:43:44 -05:00
parent 48d25adfc8
commit f7cbe9fa17
1 changed files with 66 additions and 38 deletions

View File

@ -17,7 +17,18 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -27,6 +38,7 @@ import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -36,6 +48,7 @@ import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.ResourceAllocationException;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
@ -52,7 +65,17 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.*;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
@ -108,6 +131,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private CountDownLatch consumersBeforeStartsLatch;
private final AtomicLong pendingWakeups = new AtomicLong();
private boolean allConsumersExclusiveByDefault = false;
private final AtomicBoolean started = new AtomicBoolean();
private boolean resetNeeded;
@ -717,7 +741,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>();
private LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
// roll up all message sends
class SendSync extends Synchronization {
@ -995,51 +1019,55 @@ public class Queue extends BaseDestination implements Task, UsageListener {
@Override
public void start() throws Exception {
if (memoryUsage != null) {
memoryUsage.start();
if (started.compareAndSet(false, true)) {
if (memoryUsage != null) {
memoryUsage.start();
}
if (systemUsage.getStoreUsage() != null) {
systemUsage.getStoreUsage().start();
}
systemUsage.getMemoryUsage().addUsageListener(this);
messages.start();
if (getExpireMessagesPeriod() > 0) {
scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
doPageIn(false);
}
if (systemUsage.getStoreUsage() != null) {
systemUsage.getStoreUsage().start();
}
systemUsage.getMemoryUsage().addUsageListener(this);
messages.start();
if (getExpireMessagesPeriod() > 0) {
scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod());
}
doPageIn(false);
}
@Override
public void stop() throws Exception {
if (taskRunner != null) {
taskRunner.shutdown();
}
if (this.executor != null) {
ThreadPoolUtils.shutdownNow(executor);
executor = null;
}
if (started.compareAndSet(true, false)) {
if (taskRunner != null) {
taskRunner.shutdown();
}
if (this.executor != null) {
ThreadPoolUtils.shutdownNow(executor);
executor = null;
}
scheduler.cancel(expireMessagesTask);
scheduler.cancel(expireMessagesTask);
if (flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.interrupt();
}
if (flowControlTimeoutTask.isAlive()) {
flowControlTimeoutTask.interrupt();
}
if (messages != null) {
messages.stop();
}
if (messages != null) {
messages.stop();
}
for (MessageReference messageReference : pagedInMessages.values()) {
messageReference.decrementReferenceCount();
}
pagedInMessages.clear();
for (MessageReference messageReference : pagedInMessages.values()) {
messageReference.decrementReferenceCount();
}
pagedInMessages.clear();
systemUsage.getMemoryUsage().removeUsageListener(this);
if (memoryUsage != null) {
memoryUsage.stop();
}
if (store != null) {
store.stop();
systemUsage.getMemoryUsage().removeUsageListener(this);
if (memoryUsage != null) {
memoryUsage.stop();
}
if (store != null) {
store.stop();
}
}
}