Allow purge to disable message expiration check when paging in Messages to be purged.  Avoids attempts at sending messages to a DLQ during a purge operation and in firing advisory messages for expired messages which are being thrown out by request. 

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1480731 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-09 17:50:26 +00:00
parent 25356f2695
commit 540b1c6a89
2 changed files with 165 additions and 7 deletions

View File

@ -131,11 +131,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private boolean allConsumersExclusiveByDefault = false;
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
@Override
public void run() {
asyncWakeup();
}
};
private final Runnable expireMessagesTask = new Runnable() {
@Override
public void run() {
expireMessages();
}
@ -155,11 +157,13 @@ public class Queue extends BaseDestination implements Task, UsageListener {
this.trigger = System.currentTimeMillis() + delay;
}
@Override
public long getDelay(TimeUnit unit) {
long n = trigger - System.currentTimeMillis();
return unit.convert(n, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed delayed) {
long other = ((TimeoutMessage) delayed).trigger;
int returnValue;
@ -214,6 +218,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
private final Comparator<Subscription> orderedCompare = new Comparator<Subscription>() {
@Override
public int compare(Subscription s1, Subscription s2) {
// We want the list sorted in descending order
int val = s2.getConsumerInfo().getPriority() - s1.getConsumerInfo().getPriority();
@ -235,6 +240,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
this.dispatchSelector = new QueueDispatchSelector(destination);
}
@Override
public List<Subscription> getConsumers() {
consumersLock.readLock().lock();
try {
@ -266,6 +272,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
currentBatchCount = recoveredAccumulator;
}
@Override
public boolean recoverMessage(Message message) {
recoveredAccumulator++;
if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) {
@ -297,14 +304,17 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return false;
}
@Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
throw new RuntimeException("Should not be called.");
}
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean isDuplicate(MessageId id) {
return false;
}
@ -417,6 +427,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
ConcurrentLinkedQueue<BrowserDispatch> browserDispatches = new ConcurrentLinkedQueue<BrowserDispatch>();
@Override
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(getActiveMQDestination().getQualifiedName() + " add sub: " + sub + ", dequeues: "
@ -487,6 +498,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
@Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeiveredSequenceId)
throws Exception {
super.removeSubscription(context, sub, lastDeiveredSequenceId);
@ -594,6 +606,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
@Override
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
@ -654,6 +667,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
flowControlTimeoutTask.start();
}
messagesWaitingForSpace.put(message.getMessageId(), new Runnable() {
@Override
public void run() {
try {
@ -933,9 +947,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
@Override
public void gc() {
}
@Override
public void acknowledge(ConnectionContext context, Subscription sub, MessageAck ack, MessageReference node)
throws IOException {
messageConsumed(context, node);
@ -969,6 +985,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
+ messageGroupOwners;
}
@Override
public void start() throws Exception {
if (memoryUsage != null) {
memoryUsage.start();
@ -984,6 +1001,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
doPageIn(false);
}
@Override
public void stop() throws Exception {
if (taskRunner != null) {
taskRunner.shutdown();
@ -1106,6 +1124,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return result;
}
@Override
public Message[] browse() {
List<Message> browseList = new ArrayList<Message>();
doBrowse(browseList, getMaxBrowsePageSize());
@ -1241,7 +1260,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
ConnectionContext c = createConnectionContext();
List<MessageReference> list = null;
do {
doPageIn(true);
doPageIn(true, false); // signal no expiry processing needed.
pagedInMessagesLock.readLock().lock();
try {
list = new ArrayList<MessageReference>(pagedInMessages.values());
@ -1269,6 +1288,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
getMessages().clear();
}
@Override
public void clearPendingMessages() {
messagesLock.writeLock().lock();
try {
@ -1530,6 +1550,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
* @return true if we would like to iterate again
* @see org.apache.activemq.thread.Task#iterate()
*/
@Override
public boolean iterate() {
MDC.put("activemq.destination", getName());
boolean pageInMoreMessages = false;
@ -1672,6 +1693,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
return new MessageReferenceFilter() {
@Override
public boolean evaluate(ConnectionContext context, MessageReference r) {
return messageId.equals(r.getMessageId().toString());
}
@ -1698,6 +1720,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
final BooleanExpression selectorExpression = SelectorParser.parse(selector);
return new MessageReferenceFilter() {
@Override
public boolean evaluate(ConnectionContext context, MessageReference r) throws JMSException {
MessageEvaluationContext messageEvaluationContext = context.getMessageEvaluationContext();
@ -1786,6 +1809,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
messageExpired(context, null, reference);
}
@Override
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
if (LOG.isDebugEnabled()) {
LOG.debug("message expired: " + reference);
@ -1832,6 +1856,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
wakeup();
}
@Override
public void wakeup() {
if (optimizedDispatch && !iterationRunning) {
iterate();
@ -1851,7 +1876,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
private void doPageIn(boolean force) throws Exception {
PendingList newlyPaged = doPageInForDispatch(force);
doPageIn(force, true);
}
private void doPageIn(boolean force, boolean processExpired) throws Exception {
PendingList newlyPaged = doPageInForDispatch(force, processExpired);
pagedInPendingDispatchLock.writeLock().lock();
try {
if (pagedInPendingDispatch.isEmpty()) {
@ -1869,7 +1898,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
}
private PendingList doPageInForDispatch(boolean force) throws Exception {
private PendingList doPageInForDispatch(boolean force, boolean processExpired) throws Exception {
List<QueueMessageReference> result = null;
PendingList resultList = null;
@ -1906,7 +1935,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
messages.remove();
QueueMessageReference ref = createMessageReference(node.getMessage());
if (ref.isExpired()) {
if (processExpired && ref.isExpired()) {
if (broker.isExpired(ref)) {
messageExpired(createConnectionContext(), ref);
} else {
@ -2020,7 +2049,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();) {
MessageReference node = (MessageReference) iterator.next();
MessageReference node = iterator.next();
Subscription target = null;
int interestCount = 0;
for (Subscription s : consumers) {
@ -2052,7 +2081,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
if ((target == null && interestCount > 0) || consumers.size() == 0) {
// This means all subs were full or that there are no
// consumers...
rc.addMessageLast((QueueMessageReference) node);
rc.addMessageLast(node);
}
// If it got dispatched, rotate the consumer list to get round robin
@ -2128,7 +2157,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
}
protected void pageInMessages(boolean force) throws Exception {
doDispatch(doPageInForDispatch(force));
doDispatch(doPageInForDispatch(force, true));
}
private void addToConsumerList(Subscription sub) {
@ -2273,6 +2302,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return sub;
}
@Override
public void onUsageChanged(@SuppressWarnings("rawtypes") Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage) {
asyncWakeup();

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class AMQ4518Test {
private BrokerService brokerService;
private String connectionUri;
@Before
public void setup() throws Exception {
brokerService = new BrokerService();
connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
// Configure Dead Letter Strategy
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
strategy.setProcessNonPersistent(false);
strategy.setProcessExpired(false);
// Add policy and individual DLQ strategy
PolicyEntry policy = new PolicyEntry();
policy.setTimeBeforeDispatchStarts(3000);
policy.setDeadLetterStrategy(strategy);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(pMap);
brokerService.setPersistent(false);
brokerService.start();
}
@After
public void stop() throws Exception {
brokerService.stop();
}
@Test(timeout=360000)
public void test() throws Exception {
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
final AtomicBoolean advised = new AtomicBoolean(false);
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dlqDestination = session.createTopic(AdvisorySupport.EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX + ">");
MessageConsumer consumer = session.createConsumer(dlqDestination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
advised.set(true);
}
});
connection.start();
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(new Runnable() {
@Override
public void run() {
try {
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTemporaryQueue();
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setTimeToLive(400);
producer.send(session.createTextMessage());
producer.send(session.createTextMessage());
TimeUnit.MILLISECONDS.sleep(500);
connection.close();
} catch (Exception e) {
}
}
});
service.shutdown();
assertTrue(service.awaitTermination(1, TimeUnit.MINUTES));
assertFalse("Should not get any Advisories for Expired Messages", advised.get());
}
}