diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index aba66d6f52..5525fee84c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -161,6 +161,11 @@ public interface Message { */ SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE"); + /** + * The original routing type of a message before getting transferred through DLQ or expiry + */ + SimpleString HDR_ORIG_ROUTING_TYPE = new SimpleString("_AMQ_ORIG_ROUTING_TYPE"); + /** * The time at which the message arrived at the broker. */ @@ -465,6 +470,9 @@ public interface Message { setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue); setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress()); setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID()); + if (original.getRoutingType() != null) { + setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType()); + } // reset expiry setExpiration(0); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 3e51f23b49..feaa7b1885 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -1214,6 +1214,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding message.setAddress(dlaAddress); + message.setRoutingType(null); + message.reencode(); route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index fef2d4b5e9..af3766d857 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -2688,8 +2688,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS); String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE); + Binding binding = null; - if (originalMessageAddress != null) { + if (originalMessageQueue != null) { + binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue)); + } + + if (originalMessageAddress != null && binding != null) { incDelivering(ref); @@ -2697,9 +2702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) { targetQueue = queues.get(originalMessageQueue); if (targetQueue == null) { - Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue)); - - if (binding != null && binding instanceof LocalQueueBinding) { + if (binding instanceof LocalQueueBinding) { targetQueue = ((LocalQueueBinding) binding).getID(); queues.put(originalMessageQueue, targetQueue); } @@ -2715,6 +2718,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return true; } + ActiveMQServerLogger.LOGGER.unableToFindTargetQueue(originalMessageQueue); return false; } }); @@ -3387,6 +3391,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { copyMessage.setAddress(toAddress); + if (ref.getMessage().getAnnotationString(Message.HDR_ORIG_ROUTING_TYPE) != null) { + copyMessage.setRoutingType(RoutingType.getType(ref.getMessage().getByteProperty(Message.HDR_ORIG_ROUTING_TYPE))); + } + if (queueIDs != null && queueIDs.length > 0) { ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length); for (long id : queueIDs) { @@ -3543,6 +3551,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } copy.setExpiration(0); + copy.setRoutingType(null); if (expiry) { copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index 9260971a81..d45bcffb06 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -3637,6 +3637,124 @@ public class QueueControlTest extends ManagementTestBase { session.deleteQueue(queue); } + /** + * Test retry - get a message from auto-created DLA/DLQ with HDR_ORIG_RoutingType set and put on original queue. + */ + @Test + public void testRetryMessageWithAutoCreatedResourcesAndOrigRoutingType() throws Exception { + final SimpleString dla = new SimpleString("DLA"); + final SimpleString qName = new SimpleString("q1"); + final SimpleString adName = new SimpleString("ad1"); + final String sampleText = "Put me on DLQ"; + + AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString()); + final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix()); + + server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true)); + session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable).setRoutingType(RoutingType.ANYCAST)); + + // Send message to queue. + ClientProducer producer = session.createProducer(adName); + ClientMessage m = createTextMessage(session, sampleText); + + // Set ORIG RoutingType header + m.putByteProperty(Message.HDR_ORIG_ROUTING_TYPE, (byte) 1); + producer.send(m); + session.start(); + + ClientConsumer clientConsumer = session.createConsumer(qName); + ClientMessage clientMessage = clientConsumer.receive(500); + clientMessage.acknowledge(); + Assert.assertNotNull(clientMessage); + + Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText); + + // force a rollback to DLQ + session.rollback(); + clientMessage = clientConsumer.receiveImmediate(); + Assert.assertNull(clientMessage); + + QueueControl queueControl = createManagementControl(dla, dlq); + assertMessageMetrics(queueControl, 1, true); + final long messageID = getFirstMessageId(queueControl); + + // Retry the message - i.e. it should go from DLQ to original Queue. + Assert.assertTrue(queueControl.retryMessage(messageID)); + + // Assert DLQ is empty... + Assert.assertEquals(0, getMessageCount(queueControl)); + assertMessageMetrics(queueControl, 0, durable); + + // .. and that the message is now on the original queue with ORIG RoutingType set as RoutingType + clientMessage = clientConsumer.receive(500); + clientMessage.acknowledge(); + assertTrue(clientMessage.getRoutingType() == RoutingType.ANYCAST); + Assert.assertNotNull(clientMessage); + + Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString()); + + clientConsumer.close(); + } + + /** + * Test retry - get a message from auto-created DLA/DLQ and put on original queue. + */ + @Test + public void testRetryMessageReturnedWhenNoOrigQueue() throws Exception { + final SimpleString dla = new SimpleString("DLA"); + final SimpleString qName = new SimpleString("q1"); + final SimpleString adName = new SimpleString("ad1"); + final String sampleText = "Put me on DLQ"; + + AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString()); + final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix()); + + server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true)); + + session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable)); + + // Send message to queue. + ClientProducer producer = session.createProducer(adName); + producer.send(createTextMessage(session, sampleText)); + session.start(); + + ClientConsumer clientConsumer = session.createConsumer(qName); + ClientMessage clientMessage = clientConsumer.receive(500); + clientMessage.acknowledge(); + Assert.assertNotNull(clientMessage); + + Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText); + + // force a rollback to DLQ + session.rollback(); + clientMessage = clientConsumer.receiveImmediate(); + Assert.assertNull(clientMessage); + clientConsumer.close(); + + QueueControl queueControl = createManagementControl(dla, dlq); + assertMessageMetrics(queueControl, 1, true); + final long messageID = getFirstMessageId(queueControl); + + //Delete original queue + session.deleteQueue(qName); + // Retry the message + queueControl.retryMessage(messageID); + Thread.sleep(100); + + // Assert DLQ is not empty... + Assert.assertEquals(1, getMessageCount(queueControl)); + + // .. and that the message is still intact on DLQ + clientConsumer = session.createConsumer(dlq); + clientMessage = clientConsumer.receive(500); + clientMessage.acknowledge(); + Assert.assertNotNull(clientMessage); + + Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString()); + + clientConsumer.close(); + } + // Package protected --------------------------------------------- // Protected ----------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java index 16cf9c974d..ba6f74850d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownTest.java @@ -735,4 +735,65 @@ public class ScaleDownTest extends ClusterTestBase { removeConsumer(0); removeConsumer(1); } + + @Test + public void testScaleDownMessageWithAutoCreatedDLAResources() throws Exception { + final SimpleString dla = new SimpleString("DLA"); + final SimpleString queueName = new SimpleString("q1"); + final SimpleString addressName = new SimpleString("q1"); + final String sampleText = "Put me on DLA"; + + //Set up resources for Auto-created DLAs + AddressSettings addressSettings = servers[0].getAddressSettingsRepository().getMatch(addressName.toString()); + servers[0].getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true)); + final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(addressName).concat(addressSettings.getDeadLetterQueueSuffix()); + + createQueue(0, addressName.toString(), queueName.toString(), null, false, null, null, RoutingType.ANYCAST); + + ClientSessionFactory sf = sfs[0]; + ClientSession session = addClientSession(sf.createSession(true, false)); + ClientProducer producer = addClientProducer(session.createProducer(addressName)); + + // Send message to queue with RoutingType header + ClientMessage m = createTextMessage(session, sampleText); + m.putByteProperty(Message.HDR_ROUTING_TYPE, (byte) 1); + producer.send(m); + session.start(); + + // Get message + ClientConsumer consumer = session.createConsumer(queueName); + ClientMessage message = consumer.receive(1000); + Assert.assertNotNull(message); + Assert.assertEquals(message.getBodyBuffer().readString(), sampleText); + assertTrue(message.getRoutingType() == RoutingType.ANYCAST); + message.acknowledge(); + + // force a rollback to DLA + session.rollback(); + message = consumer.receiveImmediate(); + Assert.assertNull(message); + + //Make sure it ends up on DLA + consumer.close(); + consumer = session.createConsumer(dlq.toString()); + message = consumer.receive(1000); + Assert.assertNotNull(message); + assertTrue(message.getRoutingType() == null); + + //Scale-Down + servers[0].stop(); + + //Get message on seconds node + sf = sfs[1]; + session = addClientSession(sf.createSession(false, true, true)); + consumer = session.createConsumer(dlq.toString()); + session.start(); + + message = consumer.receive(1000); + Assert.assertNotNull(message); + message.acknowledge(); + Assert.assertEquals(sampleText, message.getBodyBuffer().readString()); + + consumer.close(); + } }