Robert Davies 2007-07-20 17:08:10 +00:00
parent cfdc5e3de3
commit a414c20dcb
20 changed files with 367 additions and 127 deletions

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.activemq.advisory; package org.apache.activemq.advisory;
import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
@ -24,6 +25,7 @@ import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -38,6 +40,8 @@ import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -49,7 +53,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/ */
public class AdvisoryBroker extends BrokerFilter { public class AdvisoryBroker extends BrokerFilter {
//private static final Log log = LogFactory.getLog(AdvisoryBroker.class); private static final Log log = LogFactory.getLog(AdvisoryBroker.class);
protected final ConcurrentHashMap connections = new ConcurrentHashMap(); protected final ConcurrentHashMap connections = new ConcurrentHashMap();
protected final ConcurrentHashMap consumers = new ConcurrentHashMap(); protected final ConcurrentHashMap consumers = new ConcurrentHashMap();
@ -229,6 +233,16 @@ public class AdvisoryBroker extends BrokerFilter {
} }
} }
public void messageExpired(ConnectionContext context,MessageReference messageReference){
next.messageExpired(context,messageReference);
try{
ActiveMQTopic topic=AdvisorySupport.getExpiredMessageTopic(messageReference.getMessage().getDestination());
fireAdvisory(context,topic,messageReference.getMessage());
}catch(Exception e){
log.warn("Failed to fire message expired advisory");
}
}
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception { protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
fireAdvisory(context, topic, command, null); fireAdvisory(context, topic, command, null);
} }

View File

@ -64,6 +64,13 @@ public class AdvisorySupport {
return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX+destination.getPhysicalName()); return new ActiveMQTopic(TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX+destination.getPhysicalName());
} }
public static ActiveMQTopic getExpiredMessageTopic(ActiveMQDestination destination) {
if (destination.isQueue()) {
return getExpiredQueueMessageAdvisoryTopic(destination);
}
return getExpiredTopicMessageAdvisoryTopic(destination);
}
public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) { public static ActiveMQTopic getExpiredTopicMessageAdvisoryTopic(ActiveMQDestination destination) {
String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName(); String name = EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX+destination.getPhysicalName();
return new ActiveMQTopic(name); return new ActiveMQTopic(name);

View File

@ -20,19 +20,15 @@ package org.apache.activemq.broker;
import java.net.URI; import java.net.URI;
import java.util.Set; import java.util.Set;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
@ -135,6 +131,8 @@ public interface Broker extends Region, Service {
/** /**
* Gets a list of all the prepared xa transactions. * Gets a list of all the prepared xa transactions.
* @param context transaction ids
* @return
* @throws Exception TODO * @throws Exception TODO
*/ */
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception; public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception;
@ -151,7 +149,7 @@ public interface Broker extends Region, Service {
* Prepares a transaction. Only valid for xa transactions. * Prepares a transaction. Only valid for xa transactions.
* @param context * @param context
* @param xid * @param xid
* @return * @return id
* @throws Exception TODO * @throws Exception TODO
*/ */
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception; public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception;
@ -176,6 +174,9 @@ public interface Broker extends Region, Service {
/** /**
* Forgets a transaction. * Forgets a transaction.
* @param context
* @param transactionId
* @throws Exception
*/ */
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception; public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception;
@ -246,7 +247,35 @@ public interface Broker extends Region, Service {
*/ */
public URI getVmConnectorURI(); public URI getVmConnectorURI();
/**
* called when the brokerService starts
*/
public void brokerServiceStarted(); public void brokerServiceStarted();
/**
* @return the BrokerService
*/
BrokerService getBrokerService(); BrokerService getBrokerService();
/**
* Ensure we get the Broker at the top of the Stack
* @return the broker at the top of the Stack
*/
Broker getRoot();
/**
* A Message has Expired
* @param context
* @param messageReference
* @throws Exception
*/
public void messageExpired(ConnectionContext context, MessageReference messageReference);
/**
* A message needs to go the a DLQ
* @param context
* @param messageReference
* @throws Exception
*/
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference);
} }

View File

@ -17,9 +17,12 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.net.URI;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -38,10 +41,6 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import java.net.URI;
import java.util.Map;
import java.util.Set;
/** /**
* Allows you to intercept broker operation so that features such as security can be * Allows you to intercept broker operation so that features such as security can be
* implemented as a pluggable filter. * implemented as a pluggable filter.
@ -246,4 +245,16 @@ public class BrokerFilter implements Broker {
public BrokerService getBrokerService(){ public BrokerService getBrokerService(){
return next.getBrokerService(); return next.getBrokerService();
} }
public void messageExpired(ConnectionContext context,MessageReference message){
next.messageExpired(context,message);
}
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
next.sendToDeadLetterQueue(context,messageReference);
}
public Broker getRoot() {
return next.getRoot();
}
} }

View File

@ -17,9 +17,13 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -38,11 +42,6 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
/** /**
* Dumb implementation - used to be overriden by listeners * Dumb implementation - used to be overriden by listeners
* *
@ -245,4 +244,14 @@ public class EmptyBroker implements Broker {
public BrokerService getBrokerService(){ public BrokerService getBrokerService(){
return null; return null;
} }
public void messageExpired(ConnectionContext context,MessageReference message){
}
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
}
public Broker getRoot(){
return null;
}
} }

View File

@ -21,10 +21,9 @@ import java.net.URI;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -245,4 +244,16 @@ public class ErrorBroker implements Broker {
public BrokerService getBrokerService(){ public BrokerService getBrokerService(){
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public void messageExpired(ConnectionContext context,MessageReference message){
throw new BrokerStoppedException(this.message);
}
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference){
throw new BrokerStoppedException(this.message);
}
public Broker getRoot(){
throw new BrokerStoppedException(this.message);
}
} }

View File

@ -17,9 +17,12 @@
*/ */
package org.apache.activemq.broker; package org.apache.activemq.broker;
import java.net.URI;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
@ -38,10 +41,6 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import java.net.URI;
import java.util.Map;
import java.util.Set;
/** /**
* Like a BrokerFilter but it allows you to switch the getNext().broker. This has more * Like a BrokerFilter but it allows you to switch the getNext().broker. This has more
* overhead than a BrokerFilter since access to the getNext().broker has to synchronized * overhead than a BrokerFilter since access to the getNext().broker has to synchronized
@ -260,4 +259,17 @@ public class MutableBrokerFilter implements Broker {
return getNext().getBrokerService(); return getNext().getBrokerService();
} }
public void messageExpired(ConnectionContext context,MessageReference message){
getNext().messageExpired(context,message);
}
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference) {
getNext().sendToDeadLetterQueue(context,messageReference);
}
public Broker getRoot(){
return getNext().getRoot();
}
} }

View File

@ -332,14 +332,15 @@ abstract public class AbstractRegion implements Region {
// Try to auto create the destination... re-invoke broker from the // Try to auto create the destination... re-invoke broker from the
// top so that the proper security checks are performed. // top so that the proper security checks are performed.
try { try {
context.getBroker().addDestination(context,destination);
dest = addDestination(context, destination); dest = addDestination(context, destination);
//context.getBroker().addDestination(context,destination);
} }
catch (DestinationAlreadyExistsException e) { catch (DestinationAlreadyExistsException e) {
// if the destination already exists then lets ignore this error // if the destination already exists then lets ignore this error
} }
// We should now have the dest created. // We should now have the dest created.
//dest=(Destination) destinations.get(destination); dest=(Destination) destinations.get(destination);
} }
if(dest==null){ if(dest==null){
throw new JMSException("The destination "+destination+" does not exist."); throw new JMSException("The destination "+destination+" does not exist.");

View File

@ -42,118 +42,121 @@ import org.apache.activemq.thread.TaskRunnerFactory;
* @author fateev@amazon.com * @author fateev@amazon.com
* @version $Revision$ * @version $Revision$
*/ */
public class DestinationFactoryImpl extends DestinationFactory { public class DestinationFactoryImpl extends DestinationFactory{
protected final UsageManager memoryManager; protected final UsageManager memoryManager;
protected final TaskRunnerFactory taskRunnerFactory; protected final TaskRunnerFactory taskRunnerFactory;
protected final PersistenceAdapter persistenceAdapter; protected final PersistenceAdapter persistenceAdapter;
protected RegionBroker broker; protected RegionBroker broker;
public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, public DestinationFactoryImpl(UsageManager memoryManager,TaskRunnerFactory taskRunnerFactory,
PersistenceAdapter persistenceAdapter) { PersistenceAdapter persistenceAdapter){
this.memoryManager = memoryManager; this.memoryManager=memoryManager;
this.taskRunnerFactory = taskRunnerFactory; this.taskRunnerFactory=taskRunnerFactory;
if (persistenceAdapter == null) { if(persistenceAdapter==null){
throw new IllegalArgumentException("null persistenceAdapter"); throw new IllegalArgumentException("null persistenceAdapter");
} }
this.persistenceAdapter = persistenceAdapter; this.persistenceAdapter=persistenceAdapter;
} }
public void setRegionBroker(RegionBroker broker) { public void setRegionBroker(RegionBroker broker){
if (broker == null) { if(broker==null){
throw new IllegalArgumentException("null broker"); throw new IllegalArgumentException("null broker");
} }
this.broker = broker; this.broker=broker;
} }
public Set getDestinations() { public Set getDestinations(){
return persistenceAdapter.getDestinations(); return persistenceAdapter.getDestinations();
} }
/** /**
* @return instance of {@link Queue} or {@link Topic} * @return instance of {@link Queue} or {@link Topic}
*/ */
public Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception { public Destination createDestination(ConnectionContext context,ActiveMQDestination destination,
if (destination.isQueue()) { DestinationStatistics destinationStatistics) throws Exception{
if (destination.isTemporary()) { if(destination.isQueue()){
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; if(destination.isTemporary()){
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) { final ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destination;
return new Queue(broker.getRoot(),destination,memoryManager,null,
destinationStatistics,taskRunnerFactory,broker.getTempDataStore()){
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { public void addSubscription(ConnectionContext context,Subscription sub) throws Exception{
// Only consumers on the same connection can consume from // Only consumers on the same connection can consume from
// the temporary destination // the temporary destination
if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) { if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest); throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
} }
super.addSubscription(context, sub); super.addSubscription(context,sub);
}; };
}; };
} else { }else{
MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination); MessageStore store=persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination);
Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()); Queue queue=new Queue(broker.getRoot(),destination,memoryManager,store,
configureQueue(queue, destination); destinationStatistics,taskRunnerFactory,broker.getTempDataStore());
configureQueue(queue,destination);
queue.initialize(); queue.initialize();
return queue; return queue;
} }
} else if (destination.isTemporary()){ }else if(destination.isTemporary()){
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; final ActiveMQTempDestination tempDest=(ActiveMQTempDestination)destination;
return new Topic(destination, null, memoryManager, destinationStatistics, taskRunnerFactory) { return new Topic(broker.getRoot(),destination,null,memoryManager,
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { destinationStatistics,taskRunnerFactory){
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception{
// Only consumers on the same connection can consume from // Only consumers on the same connection can consume from
// the temporary destination // the temporary destination
if( !tempDest.getConnectionId().equals( sub.getConsumerInfo().getConsumerId().getConnectionId() ) ) { if(!tempDest.getConnectionId().equals(sub.getConsumerInfo().getConsumerId().getConnectionId())){
throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest); throw new JMSException("Cannot subscribe to remote temporary destination: "+tempDest);
} }
super.addSubscription(context, sub); super.addSubscription(context,sub);
}; };
}; };
} else { }else{
TopicMessageStore store = null; TopicMessageStore store=null;
if (!AdvisorySupport.isAdvisoryTopic(destination)) { if(!AdvisorySupport.isAdvisoryTopic(destination)){
store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic) destination); store=persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination);
} }
Topic topic=new Topic(broker.getRoot(),destination,store,memoryManager,
Topic topic = new Topic(destination, store, memoryManager, destinationStatistics, taskRunnerFactory); destinationStatistics,taskRunnerFactory);
configureTopic(topic, destination); configureTopic(topic,destination);
return topic; return topic;
} }
} }
protected void configureQueue(Queue queue, ActiveMQDestination destination) { protected void configureQueue(Queue queue,ActiveMQDestination destination){
if (broker == null) { if(broker==null){
throw new IllegalStateException("broker property is not set"); throw new IllegalStateException("broker property is not set");
} }
if (broker.getDestinationPolicy() != null) { if(broker.getDestinationPolicy()!=null){
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) { if(entry!=null){
entry.configure(queue,broker.getTempDataStore()); entry.configure(queue,broker.getTempDataStore());
} }
} }
} }
protected void configureTopic(Topic topic, ActiveMQDestination destination) { protected void configureTopic(Topic topic,ActiveMQDestination destination){
if (broker == null) { if(broker==null){
throw new IllegalStateException("broker property is not set"); throw new IllegalStateException("broker property is not set");
} }
if (broker.getDestinationPolicy() != null) { if(broker.getDestinationPolicy()!=null){
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination); PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) { if(entry!=null){
entry.configure(topic); entry.configure(topic);
} }
} }
} }
public long getLastMessageBrokerSequenceId() throws IOException { public long getLastMessageBrokerSequenceId() throws IOException{
return persistenceAdapter.getLastMessageBrokerSequenceId(); return persistenceAdapter.getLastMessageBrokerSequenceId();
} }
public PersistenceAdapter getPersistenceAdapter() { public PersistenceAdapter getPersistenceAdapter(){
return persistenceAdapter; return persistenceAdapter;
} }
public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException { public SubscriptionInfo[] getAllDurableSubscriptions(ActiveMQTopic topic) throws IOException{
return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions(); return persistenceAdapter.createTopicMessageStore(topic).getAllSubscriptions();
} }
} }

View File

@ -276,17 +276,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* @throws Exception * @throws Exception
*/ */
protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException,Exception{ protected void sendToDLQ(final ConnectionContext context,final MessageReference node) throws IOException,Exception{
// Send the message to the DLQ broker.sendToDeadLetterQueue(context,node);
Message message=node.getMessage();
if(message!=null){
// The original destination and transaction id do not get filled when the message is first
// sent,
// it is only populated if the message is routed to another destination like the DLQ
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
ActiveMQDestination deadLetterDestination=deadLetterStrategy
.getDeadLetterQueueFor(message.getDestination());
BrokerSupport.resend(context,message,deadLetterDestination);
}
} }
/** /**
@ -393,7 +383,9 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
// Message may have been sitting in the pending list a while // Message may have been sitting in the pending list a while
// waiting for the consumer to ak the message. // waiting for the consumer to ak the message.
if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){
continue; // just drop it. broker.messageExpired(getContext(),node);
dequeueCounter++;
continue;
} }
dispatch(node); dispatch(node);
count++; count++;

View File

@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
@ -72,7 +73,6 @@ import org.apache.commons.logging.LogFactory;
public class Queue implements Destination, Task { public class Queue implements Destination, Task {
private final Log log; private final Log log;
private final ActiveMQDestination destination; private final ActiveMQDestination destination;
private final List consumers = new CopyOnWriteArrayList(); private final List consumers = new CopyOnWriteArrayList();
private final Valve dispatchValve = new Valve(true); private final Valve dispatchValve = new Valve(true);
@ -96,9 +96,11 @@ public class Queue implements Destination, Task {
private final Object doDispatchMutex = new Object(); private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner; private TaskRunner taskRunner;
private boolean started = false; private boolean started = false;
final Broker broker;
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, public Queue(Broker broker,ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception { TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.broker=broker;
this.destination = destination; this.destination = destination;
this.usageManager = new UsageManager(memoryManager,destination.toString()); this.usageManager = new UsageManager(memoryManager,destination.toString());
this.usageManager.setUsagePortion(1.0f); this.usageManager.setUsagePortion(1.0f);
@ -136,7 +138,8 @@ public class Queue implements Destination, Task {
public void recoverMessage(Message message){ public void recoverMessage(Message message){
// Message could have expired while it was being loaded.. // Message could have expired while it was being loaded..
if(message.isExpired()){ if(message.isExpired()){
// TODO remove from store broker.messageExpired(createConnectionContext(),message);
destinationStatistics.getMessages().decrement();
return; return;
} }
message.setRegionDestination(Queue.this); message.setRegionDestination(Queue.this);
@ -342,9 +345,8 @@ public class Queue implements Destination, Task {
// There is delay between the client sending it and it arriving at the // There is delay between the client sending it and it arriving at the
// destination.. it may have expired. // destination.. it may have expired.
if(message.isExpired()){ if(message.isExpired()){
if (log.isDebugEnabled()) { broker.messageExpired(context,message);
log.debug("Expired message: " + message); destinationStatistics.getMessages().decrement();
}
if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) { if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack); context.getConnection().dispatchAsync(ack);
@ -365,9 +367,8 @@ public class Queue implements Destination, Task {
// While waiting for space to free up... the message may have expired. // While waiting for space to free up... the message may have expired.
if(message.isExpired()){ if(message.isExpired()){
if (log.isDebugEnabled()) { broker.messageExpired(context,message);
log.debug("Expired message: " + message); destinationStatistics.getMessages().decrement();
}
if( !message.isResponseRequired() && !context.isInRecoveryMode() ) { if( !message.isResponseRequired() && !context.isInRecoveryMode() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
@ -440,10 +441,8 @@ public class Queue implements Destination, Task {
// It could take while before we receive the commit // It could take while before we receive the commit
// op, by that time the message could have expired.. // op, by that time the message could have expired..
if(message.isExpired()){ if(message.isExpired()){
// TODO: remove message from store. broker.messageExpired(context,message);
if (log.isDebugEnabled()) { destinationStatistics.getMessages().decrement();
log.debug("Expired message: " + message);
}
return; return;
} }
sendMessage(context,message); sendMessage(context,message);
@ -1011,9 +1010,8 @@ public class Queue implements Destination, Task {
result.add(node); result.add(node);
count++; count++;
}else{ }else{
if (log.isDebugEnabled()) { broker.messageExpired(createConnectionContext(),node);
log.debug("Expired message: " + node); destinationStatistics.getMessages().decrement();
}
} }
} }
}finally{ }finally{

View File

@ -37,6 +37,7 @@ import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.DestinationAlreadyExistsException; import org.apache.activemq.broker.DestinationAlreadyExistsException;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.TransactionBroker; import org.apache.activemq.broker.TransactionBroker;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy;
@ -62,6 +63,7 @@ import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter; import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator; import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
@ -626,5 +628,51 @@ public class RegionBroker implements Broker {
return brokerService; return brokerService;
} }
public void messageExpired(ConnectionContext context,MessageReference node){
if(log.isDebugEnabled()){
log.debug("Message expired "+node);
}
getRoot().sendToDeadLetterQueue(context,node);
}
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference node){
try{
if(node!=null){
Message message=node.getMessage();
if(message!=null){
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
if(deadLetterStrategy!=null){
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
long expiration=message.getExpiration();
message.setExpiration(0);
message.setProperty("originalExpiration",new Long(expiration));
if(!message.isPersistent()){
message.setPersistent(true);
message.setProperty("originalDeliveryMode","NON_PERSISTENT");
}
// The original destination and transaction id do not get filled when the message is first
// sent,
// it is only populated if the message is routed to another destination like the DLQ
ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
.getDestination());
BrokerSupport.resend(context,message,deadLetterDestination);
}
}
}else{
log.warn("Null message for node: "+node);
}
}
}catch(Exception e){
log.warn("Failed to pass expired message to dead letter queue");
}
}
public Broker getRoot(){
try{
return getBrokerService().getBroker();
}catch(Exception e){
log.fatal("Trying to get Root Broker "+e);
throw new RuntimeException("The broker from the BrokerService should not throw an exception");
}
}
} }

View File

@ -41,7 +41,7 @@ public class TempQueueRegion extends AbstractRegion {
protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { protected Destination createDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination;
return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) { return new Queue(broker.getRoot(),destination, memoryManager, null, destinationStatistics, taskRunnerFactory, null) {
public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { public void addSubscription(ConnectionContext context,Subscription sub) throws Exception {

View File

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
@ -72,10 +73,11 @@ public class Topic implements Destination {
private boolean sendAdvisoryIfNoConsumers; private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy(); private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap(); private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();
final Broker broker;
public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats, public Topic(Broker broker,ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) { TaskRunnerFactory taskFactory) {
this.broker=broker;
this.destination = destination; this.destination = destination;
this.store = store; //this could be NULL! (If an advsiory) this.store = store; //this could be NULL! (If an advsiory)
this.usageManager = new UsageManager(memoryManager,destination.toString()); this.usageManager = new UsageManager(memoryManager,destination.toString());
@ -261,9 +263,8 @@ public class Topic implements Destination {
// There is delay between the client sending it and it arriving at the // There is delay between the client sending it and it arriving at the
// destination.. it may have expired. // destination.. it may have expired.
if( message.isExpired() ) { if( message.isExpired() ) {
if (log.isDebugEnabled()) { broker.messageExpired(context,message);
log.debug("Expired message: " + message); destinationStatistics.getMessages().decrement();
}
if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) { if( ( !message.isResponseRequired() || producerExchange.getProducerState().getInfo().getWindowSize() > 0 ) && !context.isInRecoveryMode() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
context.getConnection().dispatchAsync(ack); context.getConnection().dispatchAsync(ack);
@ -285,9 +286,8 @@ public class Topic implements Destination {
// While waiting for space to free up... the message may have expired. // While waiting for space to free up... the message may have expired.
if(message.isExpired()){ if(message.isExpired()){
if (log.isDebugEnabled()) { broker.messageExpired(context,message);
log.debug("Expired message: " + message); destinationStatistics.getMessages().decrement();
}
if( !message.isResponseRequired() && !context.isInRecoveryMode() ) { if( !message.isResponseRequired() && !context.isInRecoveryMode() ) {
ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize()); ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), message.getSize());
@ -357,7 +357,9 @@ public class Topic implements Destination {
// It could take while before we receive the commit // It could take while before we receive the commit
// operration.. by that time the message could have expired.. // operration.. by that time the message could have expired..
if( message.isExpired() ) { if( message.isExpired() ) {
// TODO: remove message from store. broker.messageExpired(context,message);
message.decrementReferenceCount();
destinationStatistics.getMessages().decrement();
return; return;
} }
dispatch(context, message); dispatch(context, message);

View File

@ -103,12 +103,7 @@ public class TopicSubscription extends AbstractSubscription{
int messagesToEvict=oldMessages.length; int messagesToEvict=oldMessages.length;
for(int i=0;i<messagesToEvict;i++){ for(int i=0;i<messagesToEvict;i++){
MessageReference oldMessage=oldMessages[i]; MessageReference oldMessage=oldMessages[i];
oldMessage.decrementReferenceCount(); discard(oldMessage);
matched.remove(oldMessage);
discarded++;
if(log.isDebugEnabled()){
log.debug("Discarding message "+oldMessages[i]);
}
} }
// lets avoid an infinite loop if we are given a bad eviction strategy // lets avoid an infinite loop if we are given a bad eviction strategy
// for a bad strategy lets just not evict // for a bad strategy lets just not evict
@ -138,6 +133,7 @@ public class TopicSubscription extends AbstractSubscription{
matched.remove(); matched.remove();
dispatchedCounter.incrementAndGet(); dispatchedCounter.incrementAndGet();
node.decrementReferenceCount(); node.decrementReferenceCount();
broker.messageExpired(getContext(),node);
break; break;
} }
} }
@ -367,6 +363,8 @@ public class TopicSubscription extends AbstractSubscription{
// waiting for the consumer to ak the message. // waiting for the consumer to ak the message.
if(message.isExpired()){ if(message.isExpired()){
message.decrementReferenceCount(); message.decrementReferenceCount();
broker.messageExpired(getContext(),message);
dequeueCounter.incrementAndGet();
continue; // just drop it. continue; // just drop it.
} }
dispatch(message); dispatch(message);
@ -411,6 +409,17 @@ public class TopicSubscription extends AbstractSubscription{
} }
} }
private void discard(MessageReference message) {
message.decrementReferenceCount();
matched.remove(message);
discarded++;
dequeueCounter.incrementAndGet();
if(log.isDebugEnabled()){
log.debug("Discarding message "+message);
}
broker.getRoot().sendToDeadLetterQueue(getContext(),message);
}
public String toString(){ public String toString(){
return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size() return "TopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
+", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched() +", dispatched="+getDispatchedQueueSize()+", delivered="+getDequeueCounter()+", matched="+matched()

View File

@ -0,0 +1,74 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.command.Message;
/**
* A strategy for choosing which destination is used for dead letter queue messages.
*
* @version $Revision: 426366 $
*/
public abstract class AbstractDeadLetterStrategy implements DeadLetterStrategy {
private boolean processNonPersistent=true;
private boolean processExpired=true;
public boolean isSendToDeadLetterQueue(Message message){
boolean result=false;
if(message!=null){
result=true;
if(message.isPersistent()==false&&processNonPersistent==false){
result=false;
}
if(message.isExpired()&&processExpired==false){
result=false;
}
}
return result;
}
/**
* @return the processExpired
*/
public boolean isProcessExpired(){
return this.processExpired;
}
/**
* @param processExpired the processExpired to set
*/
public void setProcessExpired(boolean processExpired){
this.processExpired=processExpired;
}
/**
* @return the processNonPersistent
*/
public boolean isProcessNonPersistent(){
return this.processNonPersistent;
}
/**
* @param processNonPersistent the processNonPersistent to set
*/
public void setProcessNonPersistent(boolean processNonPersistent){
this.processNonPersistent=processNonPersistent;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.activemq.broker.region.policy; package org.apache.activemq.broker.region.policy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
/** /**
* A strategy for choosing which destination is used for dead letter queue messages. * A strategy for choosing which destination is used for dead letter queue messages.
@ -26,6 +27,14 @@ import org.apache.activemq.command.ActiveMQDestination;
*/ */
public interface DeadLetterStrategy { public interface DeadLetterStrategy {
/**
* Allow pluggable strategy for deciding if message should be sent to a dead letter queue
* for example, you might not want to ignore expired or non-persistent messages
* @param message
* @return true if message should be sent to a dead letter queue
*/
public boolean isSendToDeadLetterQueue(Message message);
/** /**
* Returns the dead letter queue for the given destination. * Returns the dead letter queue for the given destination.
*/ */

View File

@ -29,7 +29,7 @@ import org.apache.activemq.command.ActiveMQTopic;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class IndividualDeadLetterStrategy implements DeadLetterStrategy { public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
private String topicPrefix = "ActiveMQ.DLQ.Topic."; private String topicPrefix = "ActiveMQ.DLQ.Topic.";
private String queuePrefix = "ActiveMQ.DLQ.Queue."; private String queuePrefix = "ActiveMQ.DLQ.Queue.";

View File

@ -29,7 +29,7 @@ import org.apache.activemq.command.ActiveMQQueue;
* *
* @version $Revision$ * @version $Revision$
*/ */
public class SharedDeadLetterStrategy implements DeadLetterStrategy { public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
private ActiveMQDestination deadLetterQueue = new ActiveMQQueue("ActiveMQ.DLQ"); private ActiveMQDestination deadLetterQueue = new ActiveMQQueue("ActiveMQ.DLQ");

View File

@ -19,6 +19,7 @@
package org.apache.activemq.broker; package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
@ -244,4 +245,14 @@ public class StubBroker implements Broker {
public BrokerService getBrokerService(){ public BrokerService getBrokerService(){
return null; return null;
} }
public void messageExpired(ConnectionContext context,MessageReference messageReference){
}
public void sendToDeadLetterQueue(ConnectionContext context,MessageReference messageReference) {
}
public Broker getRoot(){
return this;
}
} }