This commit is contained in:
Justin Bertram 2021-10-08 08:09:21 -05:00
commit 7109d0d45f
5 changed files with 202 additions and 4 deletions

View File

@ -161,6 +161,11 @@ public interface Message {
*/
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
/**
* The original routing type of a message before getting transferred through DLQ or expiry
*/
SimpleString HDR_ORIG_ROUTING_TYPE = new SimpleString("_AMQ_ORIG_ROUTING_TYPE");
/**
* The time at which the message arrived at the broker.
*/
@ -465,6 +470,9 @@ public interface Message {
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
if (original.getRoutingType() != null) {
setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType());
}
// reset expiry
setExpiration(0);

View File

@ -1214,6 +1214,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress);
message.setRoutingType(null);
message.reencode();
route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true);

View File

@ -2688,8 +2688,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
Binding binding = null;
if (originalMessageAddress != null) {
if (originalMessageQueue != null) {
binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
}
if (originalMessageAddress != null && binding != null) {
incDelivering(ref);
@ -2697,9 +2702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
targetQueue = queues.get(originalMessageQueue);
if (targetQueue == null) {
Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
if (binding != null && binding instanceof LocalQueueBinding) {
if (binding instanceof LocalQueueBinding) {
targetQueue = ((LocalQueueBinding) binding).getID();
queues.put(originalMessageQueue, targetQueue);
}
@ -2715,6 +2718,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
ActiveMQServerLogger.LOGGER.unableToFindTargetQueue(originalMessageQueue);
return false;
}
});
@ -3387,6 +3391,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copyMessage.setAddress(toAddress);
if (ref.getMessage().getAnnotationString(Message.HDR_ORIG_ROUTING_TYPE) != null) {
copyMessage.setRoutingType(RoutingType.getType(ref.getMessage().getByteProperty(Message.HDR_ORIG_ROUTING_TYPE)));
}
if (queueIDs != null && queueIDs.length > 0) {
ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
for (long id : queueIDs) {
@ -3543,6 +3551,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
copy.setExpiration(0);
copy.setRoutingType(null);
if (expiry) {
copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());

View File

@ -3637,6 +3637,124 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
/**
* Test retry - get a message from auto-created DLA/DLQ with HDR_ORIG_RoutingType set and put on original queue.
*/
@Test
public void testRetryMessageWithAutoCreatedResourcesAndOrigRoutingType() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final String sampleText = "Put me on DLQ";
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
// Send message to queue.
ClientProducer producer = session.createProducer(adName);
ClientMessage m = createTextMessage(session, sampleText);
// Set ORIG RoutingType header
m.putByteProperty(Message.HDR_ORIG_ROUTING_TYPE, (byte) 1);
producer.send(m);
session.start();
ClientConsumer clientConsumer = session.createConsumer(qName);
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
// force a rollback to DLQ
session.rollback();
clientMessage = clientConsumer.receiveImmediate();
Assert.assertNull(clientMessage);
QueueControl queueControl = createManagementControl(dla, dlq);
assertMessageMetrics(queueControl, 1, true);
final long messageID = getFirstMessageId(queueControl);
// Retry the message - i.e. it should go from DLQ to original Queue.
Assert.assertTrue(queueControl.retryMessage(messageID));
// Assert DLQ is empty...
Assert.assertEquals(0, getMessageCount(queueControl));
assertMessageMetrics(queueControl, 0, durable);
// .. and that the message is now on the original queue with ORIG RoutingType set as RoutingType
clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
assertTrue(clientMessage.getRoutingType() == RoutingType.ANYCAST);
Assert.assertNotNull(clientMessage);
Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
clientConsumer.close();
}
/**
* Test retry - get a message from auto-created DLA/DLQ and put on original queue.
*/
@Test
public void testRetryMessageReturnedWhenNoOrigQueue() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final String sampleText = "Put me on DLQ";
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
// Send message to queue.
ClientProducer producer = session.createProducer(adName);
producer.send(createTextMessage(session, sampleText));
session.start();
ClientConsumer clientConsumer = session.createConsumer(qName);
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
// force a rollback to DLQ
session.rollback();
clientMessage = clientConsumer.receiveImmediate();
Assert.assertNull(clientMessage);
clientConsumer.close();
QueueControl queueControl = createManagementControl(dla, dlq);
assertMessageMetrics(queueControl, 1, true);
final long messageID = getFirstMessageId(queueControl);
//Delete original queue
session.deleteQueue(qName);
// Retry the message
queueControl.retryMessage(messageID);
Thread.sleep(100);
// Assert DLQ is not empty...
Assert.assertEquals(1, getMessageCount(queueControl));
// .. and that the message is still intact on DLQ
clientConsumer = session.createConsumer(dlq);
clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
clientConsumer.close();
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -735,4 +735,65 @@ public class ScaleDownTest extends ClusterTestBase {
removeConsumer(0);
removeConsumer(1);
}
@Test
public void testScaleDownMessageWithAutoCreatedDLAResources() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString queueName = new SimpleString("q1");
final SimpleString addressName = new SimpleString("q1");
final String sampleText = "Put me on DLA";
//Set up resources for Auto-created DLAs
AddressSettings addressSettings = servers[0].getAddressSettingsRepository().getMatch(addressName.toString());
servers[0].getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(addressName).concat(addressSettings.getDeadLetterQueueSuffix());
createQueue(0, addressName.toString(), queueName.toString(), null, false, null, null, RoutingType.ANYCAST);
ClientSessionFactory sf = sfs[0];
ClientSession session = addClientSession(sf.createSession(true, false));
ClientProducer producer = addClientProducer(session.createProducer(addressName));
// Send message to queue with RoutingType header
ClientMessage m = createTextMessage(session, sampleText);
m.putByteProperty(Message.HDR_ROUTING_TYPE, (byte) 1);
producer.send(m);
session.start();
// Get message
ClientConsumer consumer = session.createConsumer(queueName);
ClientMessage message = consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(message.getBodyBuffer().readString(), sampleText);
assertTrue(message.getRoutingType() == RoutingType.ANYCAST);
message.acknowledge();
// force a rollback to DLA
session.rollback();
message = consumer.receiveImmediate();
Assert.assertNull(message);
//Make sure it ends up on DLA
consumer.close();
consumer = session.createConsumer(dlq.toString());
message = consumer.receive(1000);
Assert.assertNotNull(message);
assertTrue(message.getRoutingType() == null);
//Scale-Down
servers[0].stop();
//Get message on seconds node
sf = sfs[1];
session = addClientSession(sf.createSession(false, true, true));
consumer = session.createConsumer(dlq.toString());
session.start();
message = consumer.receive(1000);
Assert.assertNotNull(message);
message.acknowledge();
Assert.assertEquals(sampleText, message.getBodyBuffer().readString());
consumer.close();
}
}