git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@668146 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-06-16 12:57:29 +00:00
parent 5441774883
commit 0d8586903a
11 changed files with 110 additions and 58 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
@ -37,6 +38,7 @@ public abstract class BaseDestination implements Destination {
* from persistent storage * from persistent storage
*/ */
public static final int DEFAULT_PAGE_SIZE=100; public static final int DEFAULT_PAGE_SIZE=100;
protected final ActiveMQDestination destination; protected final ActiveMQDestination destination;
protected final Broker broker; protected final Broker broker;
protected final MessageStore store; protected final MessageStore store;
@ -59,6 +61,7 @@ public abstract class BaseDestination implements Destination {
protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
protected final BrokerService brokerService; protected final BrokerService brokerService;
protected final Broker regionBroker; protected final Broker regionBroker;
protected DeadLetterStrategy deadLetterStrategy = DEFAULT_DEAD_LETTER_STRATEGY;
/** /**
* @param broker * @param broker
@ -298,6 +301,20 @@ public abstract class BaseDestination implements Destination {
this.advisdoryForFastProducers = advisdoryForFastProducers; this.advisdoryForFastProducers = advisdoryForFastProducers;
} }
/**
* @return the dead letter strategy
*/
public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy;
}
/**
* set the dead letter strategy
* @param deadLetterStrategy
*/
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
/** /**
* called when message is consumed * called when message is consumed
* @param context * @param context

View File

@ -22,6 +22,7 @@ import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
@ -36,6 +37,7 @@ import org.apache.activemq.usage.Usage;
*/ */
public interface Destination extends Service, Task { public interface Destination extends Service, Task {
public static final DeadLetterStrategy DEFAULT_DEAD_LETTER_STRATEGY = new SharedDeadLetterStrategy();
void addSubscription(ConnectionContext context, Subscription sub) throws Exception; void addSubscription(ConnectionContext context, Subscription sub) throws Exception;
void removeSubscription(ConnectionContext context, Subscription sub) throws Exception; void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
@ -114,7 +116,14 @@ public interface Destination extends Service, Task {
*/ */
public void setLazyDispatch(boolean value); public void setLazyDispatch(boolean value);
void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node);
/**
* Inform the Destination a message has expired
* @param context
* @param subs
* @param node
*/
void messageExpired(ConnectionContext context, Subscription subs,MessageReference node);
/** /**
* called when message is consumed * called when message is consumed

View File

@ -238,4 +238,9 @@ public class DestinationFilter implements Destination {
public void slowConsumer(ConnectionContext context, Subscription subs) { public void slowConsumer(ConnectionContext context, Subscription subs) {
next.slowConsumer(context, subs); next.slowConsumer(context, subs);
} }
public void messageExpired(ConnectionContext context, Subscription subs,MessageReference node) {
next.messageExpired(context,subs, node);
}
} }

View File

@ -285,10 +285,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) { for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
final MessageReference node = iter.next(); final MessageReference node = iter.next();
if( node.isExpired() ) { if( node.isExpired() ) {
broker.messageExpired(getContext(), node);
node.getRegionDestination().messageExpired(context, this, node); node.getRegionDestination().messageExpired(context, this, node);
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
dispatched.remove(node); dispatched.remove(node);
} }
if (ack.getLastMessageId().equals(node.getMessageId())) { if (ack.getLastMessageId().equals(node.getMessageId())) {
@ -517,7 +514,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Message may have been sitting in the pending // Message may have been sitting in the pending
// list a while waiting for the consumer to ak the message. // list a while waiting for the consumer to ak the message.
if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) { if (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
broker.messageExpired(getContext(), node);
//increment number to dispatch //increment number to dispatch
numberToDispatch++; numberToDispatch++;
node.getRegionDestination().messageExpired(context, this, node); node.getRegionDestination().messageExpired(context, this, node);

View File

@ -41,10 +41,8 @@ import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.broker.region.group.MessageGroupSet; import org.apache.activemq.broker.region.group.MessageGroupSet;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.ExceptionResponse;
@ -85,7 +83,6 @@ public class Queue extends BaseDestination implements Task {
private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>(); private final LinkedHashMap<MessageId,QueueMessageReference> pagedInMessages = new LinkedHashMap<MessageId,QueueMessageReference>();
private MessageGroupMap messageGroupOwners; private MessageGroupMap messageGroupOwners;
private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory(); private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
private final Object sendLock = new Object(); private final Object sendLock = new Object();
private ExecutorService executor; private ExecutorService executor;
@ -163,8 +160,7 @@ public class Queue extends BaseDestination implements Task {
// Message could have expired while it was being // Message could have expired while it was being
// loaded.. // loaded..
if (broker.isExpired(message)) { if (broker.isExpired(message)) {
broker.messageExpired(createConnectionContext(), message); messageExpired(createConnectionContext(), message);
destinationStatistics.getMessages().decrement();
return true; return true;
} }
if (hasSpace()) { if (hasSpace()) {
@ -328,9 +324,8 @@ public class Queue extends BaseDestination implements Task {
final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode(); final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
if (message.isExpired()) { if (message.isExpired()) {
//message not stored - or added to stats yet - so chuck here
broker.getRoot().messageExpired(context, message); broker.getRoot().messageExpired(context, message);
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
if (sendProducerAck) { if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack); context.getConnection().dispatchAsync(ack);
@ -357,10 +352,8 @@ public class Queue extends BaseDestination implements Task {
// While waiting for space to free up... the // While waiting for space to free up... the
// message may have expired. // message may have expired.
if (broker.isExpired(message)) { if (message.isExpired()) {
broker.messageExpired(context, message); broker.messageExpired(context, message);
//message not added to stats yet
//destinationStatistics.getMessages().decrement();
} else { } else {
doMessageSend(producerExchange, message); doMessageSend(producerExchange, message);
} }
@ -570,14 +563,6 @@ public class Queue extends BaseDestination implements Task {
this.dispatchPolicy = dispatchPolicy; this.dispatchPolicy = dispatchPolicy;
} }
public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy;
}
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
public MessageGroupMapFactory getMessageGroupMapFactory() { public MessageGroupMapFactory getMessageGroupMapFactory() {
return messageGroupMapFactory; return messageGroupMapFactory;
} }
@ -1005,11 +990,15 @@ public class Queue extends BaseDestination implements Task {
} }
protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException { protected void removeMessage(ConnectionContext c, QueueMessageReference r) throws IOException {
removeMessage(c, null, r);
}
protected void removeMessage(ConnectionContext c, Subscription subs,QueueMessageReference r) throws IOException {
MessageAck ack = new MessageAck(); MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE); ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination); ack.setDestination(destination);
ack.setMessageID(r.getMessageId()); ack.setMessageID(r.getMessageId());
removeMessage(c, null, r, ack); removeMessage(c, subs, r, ack);
} }
protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException { protected void removeMessage(ConnectionContext context,Subscription sub,final QueueMessageReference reference,MessageAck ack) throws IOException {
@ -1044,11 +1033,19 @@ public class Queue extends BaseDestination implements Task {
} }
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference reference) { public void messageExpired(ConnectionContext context,MessageReference reference) {
((QueueMessageReference)reference).drop(); messageExpired(context,null,reference);
// Not sure.. perhaps we should forge an ack to remove the message from the store. }
// acknowledge(context, sub, ack, reference);
destinationStatistics.getMessages().decrement(); public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference);
destinationStatistics.getDequeues().increment();
destinationStatistics.getInflight().decrement();
try {
removeMessage(context,subs,(QueueMessageReference)reference);
} catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ",e);
}
synchronized(pagedInMessages) { synchronized(pagedInMessages) {
pagedInMessages.remove(reference.getMessageId()); pagedInMessages.remove(reference.getMessageId());
} }
@ -1113,9 +1110,7 @@ public class Queue extends BaseDestination implements Task {
result.add(ref); result.add(ref);
count++; count++;
} else { } else {
broker.messageExpired(createConnectionContext(), messageExpired(createConnectionContext(), node);
node);
destinationStatistics.getMessages().decrement();
} }
} }
} finally { } finally {

View File

@ -71,7 +71,6 @@ public class Topic extends BaseDestination implements Task{
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy(); private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers; private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>(); private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
private final TaskRunner taskRunner; private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>(); private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
@ -266,9 +265,8 @@ public class Topic extends BaseDestination implements Task{
// There is delay between the client sending it and it arriving at the // There is delay between the client sending it and it arriving at the
// destination.. it may have expired. // destination.. it may have expired.
if (broker.isExpired(message)) { if (message.isExpired()) {
broker.messageExpired(context, message); broker.messageExpired(context, message);
destinationStatistics.getMessages().decrement();
if (sendProducerAck) { if (sendProducerAck) {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack); context.getConnection().dispatchAsync(ack);
@ -296,10 +294,8 @@ public class Topic extends BaseDestination implements Task{
// While waiting for space to free up... the // While waiting for space to free up... the
// message may have expired. // message may have expired.
if (broker.isExpired(message)) { if (message.isExpired()) {
broker.messageExpired(context, message); broker.messageExpired(context, message);
//destinationStatistics.getEnqueues().increment();
//destinationStatistics.getMessages().decrement();
} else { } else {
doMessageSend(producerExchange, message); doMessageSend(producerExchange, message);
} }
@ -413,8 +409,6 @@ public class Topic extends BaseDestination implements Task{
if (broker.isExpired(message)) { if (broker.isExpired(message)) {
broker.messageExpired(context, message); broker.messageExpired(context, message);
message.decrementReferenceCount(); message.decrementReferenceCount();
//destinationStatistics.getEnqueues().increment();
//destinationStatistics.getMessages().decrement();
return; return;
} }
try { try {
@ -555,14 +549,6 @@ public class Topic extends BaseDestination implements Task{
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers; this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
} }
public DeadLetterStrategy getDeadLetterStrategy() {
return deadLetterStrategy;
}
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
}
// Implementation methods // Implementation methods
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
@ -595,6 +581,21 @@ public class Topic extends BaseDestination implements Task{
dispatchValve.decrement(); dispatchValve.decrement();
} }
} }
public void messageExpired(ConnectionContext context,Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference);
destinationStatistics.getMessages().decrement();
destinationStatistics.getEnqueues().decrement();
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);
ack.setMessageID(reference.getMessageId());
try {
acknowledge(context, subs, ack, reference);
} catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ",e);
}
}
/** /**
* Provides a hook to allow messages with no consumer to be processed in * Provides a hook to allow messages with no consumer to be processed in
@ -640,10 +641,4 @@ public class Topic extends BaseDestination implements Task{
} }
} }
} }
public void messageExpired(ConnectionContext context, PrefetchSubscription prefetchSubscription, MessageReference node) {
// TODO Auto-generated method stub
}
} }

View File

@ -38,5 +38,25 @@ public interface DeadLetterStrategy {
* Returns the dead letter queue for the given destination. * Returns the dead letter queue for the given destination.
*/ */
ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination); ActiveMQDestination getDeadLetterQueueFor(ActiveMQDestination originalDestination);
/**
* @return true if processes expired messages
*/
public boolean isProcessExpired() ;
/**
* @param processExpired the processExpired to set
*/
public void setProcessExpired(boolean processExpired);
/**
* @return the processNonPersistent
*/
public boolean isProcessNonPersistent();
/**
* @param processNonPersistent the processNonPersistent to set
*/
public void setProcessNonPersistent(boolean processNonPersistent);
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
@ -43,7 +44,7 @@ public class PolicyEntry extends DestinationMapEntry {
private DispatchPolicy dispatchPolicy; private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy; private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers; private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy; private DeadLetterStrategy deadLetterStrategy = Destination.DEFAULT_DEAD_LETTER_STRATEGY;
private PendingMessageLimitStrategy pendingMessageLimitStrategy; private PendingMessageLimitStrategy pendingMessageLimitStrategy;
private MessageEvictionStrategy messageEvictionStrategy; private MessageEvictionStrategy messageEvictionStrategy;
private long memoryLimit; private long memoryLimit;

View File

@ -29,6 +29,9 @@ import javax.jms.Topic;
import org.apache.activemq.TestSupport; import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -78,6 +81,14 @@ public abstract class DeadLetterTestSupport extends TestSupport {
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setPersistent(false); broker.setPersistent(false);
PolicyEntry policy = new PolicyEntry();
DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy();
if(defaultDeadLetterStrategy!=null) {
defaultDeadLetterStrategy.setProcessNonPersistent(true);
}
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
broker.setDestinationPolicy(pMap);
return broker; return broker;
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.policy;
import javax.jms.Destination; import javax.jms.Destination;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy; import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
@ -33,7 +34,9 @@ public class IndividualDeadLetterTest extends DeadLetterTest {
BrokerService broker = super.createBroker(); BrokerService broker = super.createBroker();
PolicyEntry policy = new PolicyEntry(); PolicyEntry policy = new PolicyEntry();
policy.setDeadLetterStrategy(new IndividualDeadLetterStrategy()); DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
strategy.setProcessNonPersistent(true);
policy.setDeadLetterStrategy(strategy);
PolicyMap pMap = new PolicyMap(); PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy); pMap.setDefaultEntry(policy);

View File

@ -32,7 +32,7 @@
<strictOrderDispatchPolicy /> <strictOrderDispatchPolicy />
</dispatchPolicy> </dispatchPolicy>
<deadLetterStrategy> <deadLetterStrategy>
<individualDeadLetterStrategy topicPrefix="Test.DLQ." /> <individualDeadLetterStrategy topicPrefix="Test.DLQ." processNonPersistent="true" />
</deadLetterStrategy> </deadLetterStrategy>
</policyEntry> </policyEntry>
@ -41,7 +41,7 @@
<strictOrderDispatchPolicy /> <strictOrderDispatchPolicy />
</dispatchPolicy> </dispatchPolicy>
<deadLetterStrategy> <deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="Test.DLQ."/> <individualDeadLetterStrategy queuePrefix="Test.DLQ." processNonPersistent="true"/>
</deadLetterStrategy> </deadLetterStrategy>
</policyEntry> </policyEntry>