ARTEMIS-252 retryMessages retrying to topic subscriptions + some ammends to #193
This commit is contained in:
parent
8848c9681c
commit
2a81a5f146
|
@ -21,7 +21,6 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -37,11 +36,12 @@ import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper
|
||||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
import org.apache.activemq.artemis.core.server.Consumer;
|
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
|
||||||
|
import org.apache.activemq.artemis.core.server.Consumer;
|
||||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||||
import org.apache.activemq.artemis.core.server.Queue;
|
import org.apache.activemq.artemis.core.server.Queue;
|
||||||
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
import org.apache.activemq.artemis.core.server.ServerConsumer;
|
||||||
|
import org.apache.activemq.artemis.core.server.ServerMessage;
|
||||||
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.artemis.utils.LinkedListIterator;
|
import org.apache.activemq.artemis.utils.LinkedListIterator;
|
||||||
|
@ -558,16 +558,19 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||||
clearIO();
|
clearIO();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
MessageReference message = queue.getReference(messageID);
|
Filter singleMessageFilter = new Filter() {
|
||||||
if ( message == null ) {
|
@Override
|
||||||
return false;
|
public boolean match(ServerMessage message) {
|
||||||
}
|
return message.getMessageID() == messageID;
|
||||||
else {
|
|
||||||
final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
|
|
||||||
if (originalAddress != null) {
|
|
||||||
return queue.moveReference(messageID, new SimpleString(originalAddress));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SimpleString getFilterString() {
|
||||||
|
return new SimpleString("custom filter for MESSAGEID= messageID");
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
queue.retryMessages(singleMessageFilter);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
blockOnIO();
|
blockOnIO();
|
||||||
|
@ -580,25 +583,12 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
|
||||||
checkStarted();
|
checkStarted();
|
||||||
clearIO();
|
clearIO();
|
||||||
|
|
||||||
int retriedMessages = 0;
|
|
||||||
try {
|
try {
|
||||||
Iterator<MessageReference> messageIterator = queue.totalIterator();
|
return queue.retryMessages(null);
|
||||||
while (messageIterator.hasNext()) {
|
|
||||||
MessageReference message = messageIterator.next();
|
|
||||||
// Will only try messages with Message.HDR_ORIGINAL_ADDRESS set.
|
|
||||||
final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
|
|
||||||
final long messageID = message.getMessage().getMessageID();
|
|
||||||
if ( originalAddress != null) {
|
|
||||||
if ( queue.moveReference(messageID, new SimpleString(originalAddress))) {
|
|
||||||
retriedMessages++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
blockOnIO();
|
blockOnIO();
|
||||||
}
|
}
|
||||||
return retriedMessages;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception {
|
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception {
|
||||||
|
|
|
@ -164,6 +164,8 @@ public interface Queue extends Bindable {
|
||||||
SimpleString toAddress,
|
SimpleString toAddress,
|
||||||
boolean rejectDuplicates) throws Exception;
|
boolean rejectDuplicates) throws Exception;
|
||||||
|
|
||||||
|
int retryMessages(Filter filter) throws Exception;
|
||||||
|
|
||||||
void addRedistributor(long delay);
|
void addRedistributor(long delay);
|
||||||
|
|
||||||
void cancelRedistributor() throws Exception;
|
void cancelRedistributor() throws Exception;
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||||
|
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||||
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
|
||||||
|
@ -1525,6 +1526,50 @@ public class QueueImpl implements Queue {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int retryMessages(Filter filter) throws Exception {
|
||||||
|
|
||||||
|
final HashMap<SimpleString, Long> queues = new HashMap<>();
|
||||||
|
|
||||||
|
return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
|
||||||
|
@Override
|
||||||
|
public void actMessage(Transaction tx, MessageReference ref) throws Exception {
|
||||||
|
|
||||||
|
SimpleString originalMessageAddress = ref.getMessage().getSimpleStringProperty(MessageImpl.HDR_ORIGINAL_ADDRESS);
|
||||||
|
SimpleString originalMessageQueue = ref.getMessage().getSimpleStringProperty(MessageImpl.HDR_ORIGINAL_QUEUE);
|
||||||
|
|
||||||
|
if (originalMessageAddress != null) {
|
||||||
|
|
||||||
|
incDelivering();
|
||||||
|
|
||||||
|
Long targetQueue = null;
|
||||||
|
if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
|
||||||
|
targetQueue = queues.get(originalMessageQueue);
|
||||||
|
if (targetQueue == null) {
|
||||||
|
Binding binding = postOffice.getBinding(originalMessageQueue);
|
||||||
|
|
||||||
|
if (binding != null && binding instanceof LocalQueueBinding) {
|
||||||
|
targetQueue = ((LocalQueueBinding)binding).getID();
|
||||||
|
queues.put(originalMessageQueue, targetQueue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (targetQueue != null) {
|
||||||
|
move(originalMessageAddress, tx, ref, false, false, targetQueue.longValue());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
move(originalMessageAddress, tx, ref, false, false);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
|
public synchronized boolean changeReferencePriority(final long messageID, final byte newPriority) throws Exception {
|
||||||
LinkedListIterator<MessageReference> iter = iterator();
|
LinkedListIterator<MessageReference> iter = iterator();
|
||||||
|
|
||||||
|
@ -2057,11 +2102,20 @@ public class QueueImpl implements Queue {
|
||||||
final Transaction tx,
|
final Transaction tx,
|
||||||
final MessageReference ref,
|
final MessageReference ref,
|
||||||
final boolean expiry,
|
final boolean expiry,
|
||||||
final boolean rejectDuplicate) throws Exception {
|
final boolean rejectDuplicate,
|
||||||
|
final long ... queueIDs) throws Exception {
|
||||||
ServerMessage copyMessage = makeCopy(ref, expiry);
|
ServerMessage copyMessage = makeCopy(ref, expiry);
|
||||||
|
|
||||||
copyMessage.setAddress(toAddress);
|
copyMessage.setAddress(toAddress);
|
||||||
|
|
||||||
|
if (queueIDs != null && queueIDs.length > 0) {
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
|
||||||
|
for (long id : queueIDs) {
|
||||||
|
buffer.putLong(id);
|
||||||
|
}
|
||||||
|
copyMessage.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, buffer.array());
|
||||||
|
}
|
||||||
|
|
||||||
postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
|
postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
|
||||||
|
|
||||||
acknowledge(tx, ref);
|
acknowledge(tx, ref);
|
||||||
|
|
|
@ -907,6 +907,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int retryMessages(Filter filter) throws Exception {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getConsumerCount() {
|
public int getConsumerCount() {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -16,6 +16,20 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.tests.integration.jms.server.management;
|
package org.apache.activemq.artemis.tests.integration.jms.server.management;
|
||||||
|
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.management.Notification;
|
||||||
|
import javax.naming.Context;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
import org.apache.activemq.artemis.api.core.TransportConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
|
||||||
|
@ -39,6 +53,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
|
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
|
||||||
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
|
||||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||||
import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
|
import org.apache.activemq.artemis.jms.server.management.JMSNotificationType;
|
||||||
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
|
import org.apache.activemq.artemis.tests.integration.management.ManagementControlHelper;
|
||||||
|
@ -51,20 +66,6 @@ import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
|
||||||
import javax.jms.JMSException;
|
|
||||||
import javax.jms.Message;
|
|
||||||
import javax.jms.MessageConsumer;
|
|
||||||
import javax.jms.MessageProducer;
|
|
||||||
import javax.jms.Session;
|
|
||||||
import javax.management.Notification;
|
|
||||||
import javax.naming.Context;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A QueueControlTest
|
* A QueueControlTest
|
||||||
* <br>
|
* <br>
|
||||||
|
@ -808,6 +809,16 @@ public class JMSQueueControlTest extends ManagementTestBase {
|
||||||
return testQueue;
|
return testQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected ActiveMQTopic createTestTopicWithDLQ(final String queueName, final ActiveMQQueue dlq) throws Exception {
|
||||||
|
serverManager.createTopic(false, queueName);
|
||||||
|
ActiveMQTopic testQueue = (ActiveMQTopic) ActiveMQJMSClient.createTopic(queueName);
|
||||||
|
AddressSettings addressSettings = new AddressSettings();
|
||||||
|
addressSettings.setDeadLetterAddress(new SimpleString(dlq.getAddress()));
|
||||||
|
addressSettings.setMaxDeliveryAttempts(1);
|
||||||
|
server.getAddressSettingsRepository().addMatch(testQueue.getAddress(), addressSettings);
|
||||||
|
return testQueue;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test retrying all messages put on DLQ - i.e. they should appear on the original queue.
|
* Test retrying all messages put on DLQ - i.e. they should appear on the original queue.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
|
@ -834,10 +845,64 @@ public class JMSQueueControlTest extends ManagementTestBase {
|
||||||
Assert.assertEquals(0, getMessageCount(testQueueControl));
|
Assert.assertEquals(0, getMessageCount(testQueueControl));
|
||||||
Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl));
|
Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl));
|
||||||
|
|
||||||
|
Assert.assertEquals(10,getMessageCount(dlqQueueControl));
|
||||||
|
|
||||||
dlqQueueControl.retryMessages();
|
dlqQueueControl.retryMessages();
|
||||||
|
|
||||||
Assert.assertEquals(numMessagesToTest, getMessageCount(testQueueControl));
|
Assert.assertEquals(numMessagesToTest, getMessageCount(testQueueControl));
|
||||||
Assert.assertEquals(0,getMessageCount(dlqQueueControl));
|
Assert.assertEquals(0,getMessageCount(dlqQueueControl));
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test retrying all messages put on DLQ - i.e. they should appear on the original queue.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRetryMessagesOnTopic() throws Exception {
|
||||||
|
ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
|
||||||
|
ActiveMQTopic testTopic = createTestTopicWithDLQ(RandomUtil.randomString(), dlq);
|
||||||
|
|
||||||
|
Connection connectionConsume = createConnection();
|
||||||
|
connectionConsume.setClientID("ID");
|
||||||
|
Session sessionConsume = connectionConsume.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
MessageConsumer cons1 = sessionConsume.createDurableSubscriber(testTopic, "sub1");
|
||||||
|
MessageConsumer cons2 = sessionConsume.createDurableSubscriber(testTopic, "sub2");
|
||||||
|
|
||||||
|
|
||||||
|
final int numMessagesToTest = 10;
|
||||||
|
JMSUtil.sendMessages(testTopic, numMessagesToTest);
|
||||||
|
|
||||||
|
|
||||||
|
connectionConsume.start();
|
||||||
|
for (int i = 0; i < numMessagesToTest; i++) {
|
||||||
|
Assert.assertNotNull(cons1.receive(500));
|
||||||
|
}
|
||||||
|
sessionConsume.commit();
|
||||||
|
|
||||||
|
Assert.assertNull(cons1.receiveNoWait());
|
||||||
|
|
||||||
|
connectionConsume.start();
|
||||||
|
for (int i = 0; i < numMessagesToTest; i++) {
|
||||||
|
cons2.receive(500);
|
||||||
|
}
|
||||||
|
sessionConsume.rollback();
|
||||||
|
Assert.assertNull(cons2.receiveNoWait());
|
||||||
|
|
||||||
|
JMSQueueControl dlqQueueControl = createManagementControl(dlq);
|
||||||
|
dlqQueueControl.retryMessages();
|
||||||
|
|
||||||
|
Assert.assertNull("Retry is sending back to cons1 even though it succeeded", cons1.receiveNoWait());
|
||||||
|
|
||||||
|
for (int i = 0; i < numMessagesToTest; i++) {
|
||||||
|
Assert.assertNotNull(cons2.receive(500));
|
||||||
|
}
|
||||||
|
sessionConsume.commit();
|
||||||
|
Assert.assertNull(cons1.receiveNoWait());
|
||||||
|
|
||||||
|
connectionConsume.close();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -49,6 +49,11 @@ public class FakeQueue implements Queue {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int retryMessages(Filter filter) throws Exception {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setConsumersRefCount(ReferenceCounter referenceCounter) {
|
public void setConsumersRefCount(ReferenceCounter referenceCounter) {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue