git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1471420 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2013-04-24 14:01:16 +00:00
parent 705b3c5f9d
commit e1f170da6e
11 changed files with 92 additions and 0 deletions

View File

@ -473,4 +473,9 @@ public class DestinationView implements DestinationViewMBean {
return optionsString;
}
@Override
public boolean isDLQ() {
return destination.isDLQ();
}
}

View File

@ -365,4 +365,10 @@ public interface DestinationViewMBean {
@MBeanInfo("returns the destination options, name value pairs as URL queryString")
String getOptions();
/**
* @return true if this is dead letter queue
*/
@MBeanInfo("Dead Letter Queue")
boolean isDLQ();
}

View File

@ -102,6 +102,11 @@ public class QueueView extends DestinationView implements QueueViewMBean {
return ((Queue)destination).moveMatchingMessagesTo(context, selector, toDestination, maximumMessages);
}
public int retryMessages() throws Exception {
ConnectionContext context = BrokerSupport.getConnectionContext(broker.getContextBroker());
return ((Queue)destination).retryMessages(context, Integer.MAX_VALUE);
}
/**
* Moves a message back to its original destination
*/

View File

@ -129,6 +129,12 @@ public interface QueueViewMBean extends DestinationViewMBean {
*/
@MBeanInfo("Moves up to a specified number of messages based on an SQL-92 selecton on the message headers or XPATH on the body into the specified destination.")
int moveMatchingMessagesTo(@MBeanInfo("selector") String selector, @MBeanInfo("destinationName") String destinationName, @MBeanInfo("maximumMessages") int maximumMessages) throws Exception;
/**
* Retries messages sent to the DLQ
*/
@MBeanInfo("Retries messages sent to the DLQ")
public int retryMessages() throws Exception;
/**
* @return true if the message cursor has memory space available

View File

@ -779,4 +779,8 @@ public abstract class BaseDestination implements Destination {
return ack;
}
public boolean isDLQ() {
return getDeadLetterStrategy().isDLQ(this.getActiveMQDestination());
}
}

View File

@ -237,4 +237,6 @@ public interface Destination extends Service, Task, Message.MessageDestination {
void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage);
public void clearPendingMessages();
public boolean isDLQ();
}

View File

@ -382,4 +382,8 @@ public class DestinationFilter implements Destination {
next.clearPendingMessages();
}
@Override
public boolean isDLQ() {
return next.isDLQ();
}
}

View File

@ -1404,6 +1404,35 @@ public class Queue extends BaseDestination implements Task, UsageListener {
return movedCounter;
}
public int retryMessages(ConnectionContext context, int maximumMessages) throws Exception {
if (!isDLQ()) {
throw new Exception("Retry of message is only possible on Dead Letter Queues!");
}
int restoredCounter = 0;
Set<QueueMessageReference> set = new LinkedHashSet<QueueMessageReference>();
do {
doPageIn(true);
pagedInMessagesLock.readLock().lock();
try{
set.addAll(pagedInMessages.values());
}finally {
pagedInMessagesLock.readLock().unlock();
}
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>(set);
for (QueueMessageReference ref : list) {
if (ref.getMessage().getOriginalDestination() != null) {
moveMessageTo(context, ref, ref.getMessage().getOriginalDestination());
set.remove(ref);
if (++restoredCounter >= maximumMessages && maximumMessages > 0) {
return restoredCounter;
}
}
}
} while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages);
return restoredCounter;
}
/**
* @return true if we would like to iterate again
* @see org.apache.activemq.thread.Task#iterate()
@ -1672,6 +1701,12 @@ public class Queue extends BaseDestination implements Task, UsageListener {
destinationStatistics.getExpired().increment();
try {
removeMessage(context, subs, (QueueMessageReference) reference);
messagesLock.writeLock().lock();
try {
messages.rollback(reference.getMessageId());
} finally {
messagesLock.writeLock().unlock();
}
} catch (IOException e) {
LOG.error("Failed to remove expired Message from the store ", e);
}

View File

@ -60,4 +60,6 @@ public interface DeadLetterStrategy {
*/
public void setProcessNonPersistent(boolean processNonPersistent);
public boolean isDLQ(ActiveMQDestination destination);
}

View File

@ -167,4 +167,19 @@ public class IndividualDeadLetterStrategy extends AbstractDeadLetterStrategy {
return new ActiveMQTopic(name);
}
}
@Override
public boolean isDLQ(ActiveMQDestination destination) {
String name = destination.getPhysicalName();
if (destination.isQueue()) {
if ((queuePrefix != null && name.startsWith(queuePrefix)) || (queueSuffix != null && name.endsWith(queueSuffix))) {
return true;
}
} else {
if ((topicPrefix != null && name.startsWith(topicPrefix)) || (topicSuffix != null && name.endsWith(topicSuffix))) {
return true;
}
}
return false;
}
}

View File

@ -48,4 +48,12 @@ public class SharedDeadLetterStrategy extends AbstractDeadLetterStrategy {
this.deadLetterQueue = deadLetterQueue;
}
@Override
public boolean isDLQ(ActiveMQDestination destination) {
if (destination.equals(deadLetterQueue)) {
return true;
} else {
return false;
}
}
}