This commit is contained in:
Clebert Suconic 2018-01-31 18:07:12 -05:00
commit 3d79a08963
2 changed files with 72 additions and 8 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -24,7 +25,6 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.jboss.logging.Logger;
@ -98,7 +98,7 @@ public class DivertImpl implements Divert {
copy = message.copy(id);
// This will set the original MessageId, and the original address
copy.referenceOriginalMessage(message, null);
copy.referenceOriginalMessage(message, this.getUniqueName().toString());
copy.setAddress(forwardAddress);

View File

@ -16,10 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
@ -28,6 +26,11 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.Notification;
import javax.management.openmbean.CompositeData;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
@ -47,6 +50,7 @@ import org.apache.activemq.artemis.api.core.management.MessageCounterInfo;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
@ -59,8 +63,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
public class QueueControlTest extends ManagementTestBase {
private ActiveMQServer server;
@ -887,6 +889,68 @@ public class QueueControlTest extends ManagementTestBase {
clientConsumer.close();
}
/**
* Test retry - get a diverted message from DLQ and put on original queue.
*/
@Test
public void testRetryDivertedMessage() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString dlq = new SimpleString("DLQ");
final SimpleString forwardingQueue = new SimpleString("forwardingQueue");
final SimpleString forwardingAddress = new SimpleString("forwardingAddress");
final SimpleString myTopic = new SimpleString("myTopic");
final String sampleText = "Put me on DLQ";
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(forwardingAddress.toString(), addressSettings);
// create target queue, DLQ and source topic
session.createQueue(dla, RoutingType.ANYCAST, dlq, null, false);
session.createQueue(forwardingAddress, RoutingType.ANYCAST, forwardingQueue, null, false);
session.createAddress(myTopic, RoutingType.MULTICAST, false);
DivertConfiguration divert = new DivertConfiguration().setName("local-divert")
.setRoutingName("some-name").setAddress(myTopic.toString())
.setForwardingAddress(forwardingAddress.toString()).setExclusive(false);
server.deployDivert(divert);
// Send message to topic.
ClientProducer producer = session.createProducer(myTopic);
producer.send(createTextMessage(session, sampleText));
session.start();
ClientConsumer clientConsumer = session.createConsumer(forwardingQueue);
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
// force a rollback to DLQ
session.rollback();
clientMessage = clientConsumer.receiveImmediate();
Assert.assertNull(clientMessage);
QueueControl queueControl = createManagementControl(dla, dlq, RoutingType.ANYCAST);
Assert.assertEquals(1, getMessageCount(queueControl));
final long messageID = getFirstMessageId(queueControl);
// Retry the message - i.e. it should go from DLQ to original Queue.
Assert.assertTrue(queueControl.retryMessage(messageID));
// Assert DLQ is empty...
Assert.assertEquals(0, getMessageCount(queueControl));
// .. and that the message is now on the original queue once more.
clientMessage = clientConsumer.receive(500);
Assert.assertNotNull(clientMessage); // fails because of AMQ222196 !!!
clientMessage.acknowledge();
Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
clientConsumer.close();
}
/**
* Test retry multiple messages from DLQ to original queue.
*/