git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1297252 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-03-05 22:08:51 +00:00
parent 0dfbc5054d
commit 3a5b48a4db
4 changed files with 320 additions and 100 deletions

View File

@ -61,7 +61,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
}
public final boolean isActive() {
@ -178,7 +178,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
topic.getDestinationStatistics().getInflight().subtract(dispatched.size());
}
}
for (final MessageReference node : dispatched) {
// Mark the dispatched messages as redelivered for next time.
Integer count = redeliveredMessages.get(node.getMessageId());
@ -212,10 +212,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
}
prefetchExtension = 0;
prefetchExtension.set(0);
}
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
if (node != QueueMessageReference.NULL_MESSAGE) {
@ -243,7 +243,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
public void removePending(MessageReference node) throws IOException {
pending.remove(node);
}
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
synchronized(pending) {
pending.addRecoveredMessage(message);
@ -272,10 +272,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
node.decrementReferenceCount();
}
public synchronized String toString() {
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations=" + durableDestinations.size() + ", total=" + enqueueCounter + ", pending="
+ getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + this.prefetchExtension;
+ getPendingQueueSize() + ", dispatched=" + dispatchCounter + ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
}
public SubscriptionKey getSubscriptionKey() {
@ -301,8 +300,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
synchronized(dispatched) {
for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next();
for (MessageReference node : dispatched) {
node.decrementReferenceCount();
}
dispatched.clear();
@ -319,7 +317,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
}
protected boolean isDropped(MessageReference node) {
return false;
}

View File

@ -23,8 +23,11 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -47,17 +50,15 @@ import org.slf4j.LoggerFactory;
/**
* A subscription that honors the pre-fetch option of the ConsumerInfo.
*
*
*/
public abstract class PrefetchSubscription extends AbstractSubscription {
private static final Logger LOG = LoggerFactory.getLogger(PrefetchSubscription.class);
protected final Scheduler scheduler;
protected PendingMessageCursor pending;
protected final List<MessageReference> dispatched = new CopyOnWriteArrayList<MessageReference>();
protected int prefetchExtension;
protected final AtomicInteger prefetchExtension = new AtomicInteger();
protected boolean usePrefetchExtension = true;
protected long enqueueCounter;
protected long dispatchCounter;
@ -68,7 +69,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
protected final Object pendingLock = new Object();
private final Object dispatchLock = new Object();
private final CountDownLatch okForAckAsDispatchDone = new CountDownLatch(1);
public PrefetchSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker,context, info);
this.usageManager=usageManager;
@ -84,42 +85,38 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* Allows a message to be pulled on demand by a client
*/
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
// The slave should not deliver pull messages. TODO: when the slave
// becomes a master,
// He should send a NULL message to all the consumers to 'wake them up'
// in case
// they were waiting for a message.
// The slave should not deliver pull messages.
// TODO: when the slave becomes a master, He should send a NULL message to all the
// consumers to 'wake them up' in case they were waiting for a message.
if (getPrefetchSize() == 0 && !isSlave()) {
final long dispatchCounterBeforePull;
synchronized(this) {
prefetchExtension++;
dispatchCounterBeforePull = dispatchCounter;
}
// Have the destination push us some messages.
for (Destination dest : destinations) {
dest.iterate();
}
dispatchPending();
prefetchExtension.incrementAndGet();
final long dispatchCounterBeforePull = dispatchCounter;
// Have the destination push us some messages.
for (Destination dest : destinations) {
dest.iterate();
}
dispatchPending();
synchronized(this) {
// If there was nothing dispatched.. we may need to setup a timeout.
if (dispatchCounterBeforePull == dispatchCounter) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
dispatchPending();
}
if (pull.getTimeout() > 0) {
scheduler.executeAfterDelay(new Runnable() {
public void run() {
pullTimeout(dispatchCounterBeforePull);
}
}, pull.getTimeout());
}
}
// If there was nothing dispatched.. we may need to setup a timeout.
if (dispatchCounterBeforePull == dispatchCounter) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
dispatchPending();
}
if (pull.getTimeout() > 0) {
scheduler.executeAfterDelay(new Runnable() {
@Override
public void run() {
pullTimeout(dispatchCounterBeforePull);
}
}, pull.getTimeout());
}
}
}
}
return null;
@ -130,8 +127,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* timeout was setup, then send the NULL message.
*/
final void pullTimeout(long dispatchCounterBeforePull) {
synchronized (pendingLock) {
if (dispatchCounterBeforePull == dispatchCounter) {
synchronized (pendingLock) {
if (dispatchCounterBeforePull == dispatchCounter) {
try {
add(QueueMessageReference.NULL_MESSAGE);
dispatchPending();
@ -144,13 +141,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public void add(MessageReference node) throws Exception {
synchronized (pendingLock) {
// The destination may have just been removed...
// The destination may have just been removed...
if( !destinations.contains(node.getRegionDestination()) && node!=QueueMessageReference.NULL_MESSAGE) {
// perhaps we should inform the caller that we are no longer valid to dispatch to?
return;
}
enqueueCounter++;
pending.addMessageLast(node);
pending.addMessageLast(node);
}
dispatchPending();
}
@ -188,7 +185,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Handle the standard acknowledgment case.
boolean callDispatchMatched = false;
Destination destination = null;
if (!isSlave()) {
if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
// suppress unexpected ack exception in this expected case
@ -201,10 +198,10 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
synchronized(dispatchLock) {
if (ack.isStandardAck()) {
// First check if the ack matches the dispatched. When using failover this might
// not be the case. We don't ever want to ack the wrong messages.
assertAckMatchesDispatched(ack);
// First check if the ack matches the dispatched. When using failover this might
// not be the case. We don't ever want to ack the wrong messages.
assertAckMatchesDispatched(ack);
// Acknowledge all dispatched messages up till the message id of
// the acknowledgment.
int index = 0;
@ -217,7 +214,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
inAckRange = true;
}
if (inAckRange) {
// Don't remove the nodes until we are committed.
// Don't remove the nodes until we are committed.
if (!context.isInTransaction()) {
dequeueCounter++;
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
@ -227,13 +224,26 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
if (ack.getLastMessageId().equals(messageId)) {
// contract prefetch if dispatch required a pull
if (getPrefetchSize() == 0) {
prefetchExtension = Math.max(0, prefetchExtension - index);
// Protect extension update against parallel updates.
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - index);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
} else if (usePrefetchExtension && context.isInTransaction()) {
// extend prefetch window only if not a pulling consumer
prefetchExtension = Math.max(prefetchExtension, index);
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, index);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
destination = node.getRegionDestination();
callDispatchMatched = true;
@ -264,7 +274,15 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} else {
registerRemoveSync(context, node);
}
prefetchExtension = Math.max(0, prefetchExtension - 1);
// Protect extension update against parallel updates.
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
acknowledge(context, ack, node);
destination = node.getRegionDestination();
callDispatchMatched = true;
@ -286,7 +304,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
if (ack.getLastMessageId().equals(node.getMessageId())) {
if (usePrefetchExtension) {
prefetchExtension = Math.max(prefetchExtension, index + 1);
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(currentExtension, index + 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
destination = node.getRegionDestination();
callDispatchMatched = true;
@ -351,8 +375,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
prefetchExtension = Math.max(0, prefetchExtension
- (index + 1));
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - (index + 1));
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
destination = node.getRegionDestination();
callDispatchMatched = true;
break;
@ -369,7 +398,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
}
}
}
if (callDispatchMatched && destination != null) {
if (callDispatchMatched && destination != null) {
destination.wakeup();
dispatchPending();
} else {
@ -416,11 +445,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
/**
* Checks an ack versus the contents of the dispatched list.
*
*
* @param ack
* @throws JMSException if it does not match
*/
protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
protected void assertAckMatchesDispatched(MessageAck ack) throws JMSException {
MessageId firstAckedMsg = ack.getFirstMessageId();
MessageId lastAckedMsg = ack.getLastMessageId();
int checkCount = 0;
@ -468,37 +497,37 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
broker.getRoot().sendToDeadLetterQueue(context, node, this);
}
public int getInFlightSize() {
return dispatched.size();
}
/**
* Used to determine if the broker can dispatch to the consumer.
*
*
* @return
*/
public boolean isFull() {
return dispatched.size() - prefetchExtension >= info.getPrefetchSize();
return dispatched.size() - prefetchExtension.get() >= info.getPrefetchSize();
}
/**
* @return true when 60% or more room is left for dispatching messages
*/
public boolean isLowWaterMark() {
return (dispatched.size() - prefetchExtension) <= (info.getPrefetchSize() * .4);
return (dispatched.size() - prefetchExtension.get()) <= (info.getPrefetchSize() * .4);
}
/**
* @return true when 10% or less room is left for dispatching messages
*/
public boolean isHighWaterMark() {
return (dispatched.size() - prefetchExtension) >= (info.getPrefetchSize() * .9);
return (dispatched.size() - prefetchExtension.get()) >= (info.getPrefetchSize() * .9);
}
@Override
public int countBeforeFull() {
return info.getPrefetchSize() + prefetchExtension - dispatched.size();
return info.getPrefetchSize() + prefetchExtension.get() - dispatched.size();
}
public int getPendingQueueSize() {
@ -559,16 +588,16 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Synchronized to DispatchLock
synchronized(dispatchLock) {
ArrayList<MessageReference> references = new ArrayList<MessageReference>();
for (MessageReference r : dispatched) {
if( r.getRegionDestination() == destination) {
for (MessageReference r : dispatched) {
if( r.getRegionDestination() == destination) {
references.add(r);
}
}
}
}
rc.addAll(references);
destination.getDestinationStatistics().getDispatched().subtract(references.size());
destination.getDestinationStatistics().getInflight().subtract(references.size());
dispatched.removeAll(references);
}
}
}
return rc;
}
@ -589,7 +618,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node == null) {
break;
}
// Synchronize between dispatched list and remove of message from pending list
// related to remove subscription action
synchronized(dispatchLock) {
@ -634,9 +663,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (message == null) {
return false;
}
okForAckAsDispatchDone.countDown();
// No reentrant lock - Patch needed to IndirectMessageReference on method lock
if (!isSlave()) {
@ -646,7 +675,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
dispatchCounter++;
dispatched.add(node);
} else {
prefetchExtension = Math.max(0, prefetchExtension - 1);
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - 1);
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
}
if (info.isDispatchAsync()) {
md.setTransmitCallback(new Runnable() {
@ -674,14 +709,14 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node.getRegionDestination() != null) {
if (node != QueueMessageReference.NULL_MESSAGE) {
node.getRegionDestination().getDestinationStatistics().getDispatched().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().increment();
if (LOG.isTraceEnabled()) {
LOG.trace(info.getConsumerId() + " dispatched: " + message.getMessageId() + " - "
+ message.getDestination() + ", dispatched: " + dispatchCounter + ", inflight: " + dispatched.size());
}
}
}
if (info.isDispatchAsync()) {
try {
dispatchPending();
@ -693,7 +728,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
/**
* inform the MessageConsumer on the client to change it's prefetch
*
*
* @param newPrefetch
*/
public void updateConsumerPrefetch(int newPrefetch) {
@ -711,42 +746,41 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
* @return MessageDispatch
*/
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(info.getConsumerId());
if (node == QueueMessageReference.NULL_MESSAGE) {
MessageDispatch md = new MessageDispatch();
md.setMessage(null);
md.setConsumerId(info.getConsumerId());
md.setDestination(null);
return md;
} else {
MessageDispatch md = new MessageDispatch();
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
md.setMessage(message);
md.setRedeliveryCounter(node.getRedeliveryCounter());
return md;
}
return md;
}
/**
* Use when a matched message is about to be dispatched to the client.
*
*
* @param node
* @return false if the message should not be dispatched to the client
* (another sub may have already dispatched it for example).
* @throws IOException
*/
protected abstract boolean canDispatch(MessageReference node) throws IOException;
protected abstract boolean isDropped(MessageReference node);
/**
* Used during acknowledgment to remove the message.
*
*
* @throws IOException
*/
protected abstract void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference node) throws IOException;
public int getMaxProducersToAudit() {
return maxProducersToAudit;
}
@ -762,7 +796,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public void setMaxAuditDepth(int maxAuditDepth) {
this.maxAuditDepth = maxAuditDepth;
}
public boolean isUsePrefetchExtension() {
return usePrefetchExtension;
}
@ -770,4 +804,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public void setUsePrefetchExtension(boolean usePrefetchExtension) {
this.usePrefetchExtension = usePrefetchExtension;
}
protected int getPrefetchExtension() {
return this.prefetchExtension.get();
}
}

View File

@ -60,7 +60,17 @@ import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.util.InsertionCountList;
import org.apache.activemq.command.*;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;

View File

@ -0,0 +1,174 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ3732Test {
private static Logger LOG = LoggerFactory.getLogger(AMQ3529Test.class);
private ActiveMQConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private BrokerService broker;
private String connectionUri;
private final Random pause = new Random();
private final long NUM_MESSAGES = 25000;
private final AtomicLong totalConsumed = new AtomicLong();
@Before
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://0.0.0.0:0");
broker.start();
broker.waitUntilStarted();
connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString();
connectionFactory = new ActiveMQConnectionFactory(connectionUri);
connectionFactory.getPrefetchPolicy().setAll(0);
}
@After
public void stopBroker() throws Exception {
connection.close();
broker.stop();
broker.waitUntilStopped();
}
@Test(timeout = 1200000)
public void testInterruptionAffects() throws Exception {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
Queue queue = session.createQueue("AMQ3732Test");
final LinkedBlockingQueue<Message> workQueue = new LinkedBlockingQueue<Message>();
final MessageConsumer consumer1 = session.createConsumer(queue);
final MessageConsumer consumer2 = session.createConsumer(queue);
final MessageProducer producer = session.createProducer(queue);
Thread consumer1Thread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (totalConsumed.get() < NUM_MESSAGES) {
Message message = consumer1.receiveNoWait();
if (message != null) {
workQueue.add(message);
}
}
} catch(Exception e) {
LOG.error("Caught an unexpected error: ", e);
}
}
});
consumer1Thread.start();
Thread consumer2Thread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (totalConsumed.get() < NUM_MESSAGES) {
Message message = consumer2.receive(50);
if (message != null) {
workQueue.add(message);
}
}
} catch(Exception e) {
LOG.error("Caught an unexpected error: ", e);
}
}
});
consumer2Thread.start();
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < NUM_MESSAGES; ++i) {
producer.send(session.createTextMessage("TEST"));
TimeUnit.MILLISECONDS.sleep(pause.nextInt(10));
}
} catch(Exception e) {
LOG.error("Caught an unexpected error: ", e);
}
}
});
producerThread.start();
Thread ackingThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (totalConsumed.get() < NUM_MESSAGES) {
Message message = workQueue.take();
message.acknowledge();
totalConsumed.incrementAndGet();
if ((totalConsumed.get() % 100) == 0) {
LOG.info("Consumed " + totalConsumed.get() + " messages so far.");
}
}
} catch(Exception e) {
LOG.error("Caught an unexpected error: ", e);
}
}
});
ackingThread.start();
producerThread.join();
consumer1Thread.join();
consumer2Thread.join();
ackingThread.join();
assertEquals(NUM_MESSAGES, totalConsumed.get());
}
}