ARTEMIS-3313 routing-type conflict during import/export

This commit is contained in:
Justin Bertram 2021-10-07 20:13:38 -05:00
parent d125109784
commit c3d93f5590
5 changed files with 202 additions and 4 deletions

View File

@ -161,6 +161,11 @@ public interface Message {
*/ */
SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE"); SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
/**
* The original routing type of a message before getting transferred through DLQ or expiry
*/
SimpleString HDR_ORIG_ROUTING_TYPE = new SimpleString("_AMQ_ORIG_ROUTING_TYPE");
/** /**
* The time at which the message arrived at the broker. * The time at which the message arrived at the broker.
*/ */
@ -465,6 +470,9 @@ public interface Message {
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue); setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress()); setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID()); setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
if (original.getRoutingType() != null) {
setBrokerProperty(Message.HDR_ORIG_ROUTING_TYPE, original.getRoutingType().getType());
}
// reset expiry // reset expiry
setExpiration(0); setExpiration(0);

View File

@ -1214,6 +1214,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
message.setAddress(dlaAddress); message.setAddress(dlaAddress);
message.setRoutingType(null);
message.reencode(); message.reencode();
route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true); route(message, new RoutingContextImpl(context.getTransaction()), false, true, null, true);

View File

@ -2688,8 +2688,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS); String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE); String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
Binding binding = null;
if (originalMessageAddress != null) { if (originalMessageQueue != null) {
binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
}
if (originalMessageAddress != null && binding != null) {
incDelivering(ref); incDelivering(ref);
@ -2697,9 +2702,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) { if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
targetQueue = queues.get(originalMessageQueue); targetQueue = queues.get(originalMessageQueue);
if (targetQueue == null) { if (targetQueue == null) {
Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue)); if (binding instanceof LocalQueueBinding) {
if (binding != null && binding instanceof LocalQueueBinding) {
targetQueue = ((LocalQueueBinding) binding).getID(); targetQueue = ((LocalQueueBinding) binding).getID();
queues.put(originalMessageQueue, targetQueue); queues.put(originalMessageQueue, targetQueue);
} }
@ -2715,6 +2718,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true; return true;
} }
ActiveMQServerLogger.LOGGER.unableToFindTargetQueue(originalMessageQueue);
return false; return false;
} }
}); });
@ -3387,6 +3391,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copyMessage.setAddress(toAddress); copyMessage.setAddress(toAddress);
if (ref.getMessage().getAnnotationString(Message.HDR_ORIG_ROUTING_TYPE) != null) {
copyMessage.setRoutingType(RoutingType.getType(ref.getMessage().getByteProperty(Message.HDR_ORIG_ROUTING_TYPE)));
}
if (queueIDs != null && queueIDs.length > 0) { if (queueIDs != null && queueIDs.length > 0) {
ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length); ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
for (long id : queueIDs) { for (long id : queueIDs) {
@ -3543,6 +3551,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} }
copy.setExpiration(0); copy.setExpiration(0);
copy.setRoutingType(null);
if (expiry) { if (expiry) {
copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis()); copy.setBrokerProperty(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());

View File

@ -3637,6 +3637,124 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue); session.deleteQueue(queue);
} }
/**
* Test retry - get a message from auto-created DLA/DLQ with HDR_ORIG_RoutingType set and put on original queue.
*/
@Test
public void testRetryMessageWithAutoCreatedResourcesAndOrigRoutingType() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final String sampleText = "Put me on DLQ";
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
// Send message to queue.
ClientProducer producer = session.createProducer(adName);
ClientMessage m = createTextMessage(session, sampleText);
// Set ORIG RoutingType header
m.putByteProperty(Message.HDR_ORIG_ROUTING_TYPE, (byte) 1);
producer.send(m);
session.start();
ClientConsumer clientConsumer = session.createConsumer(qName);
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
// force a rollback to DLQ
session.rollback();
clientMessage = clientConsumer.receiveImmediate();
Assert.assertNull(clientMessage);
QueueControl queueControl = createManagementControl(dla, dlq);
assertMessageMetrics(queueControl, 1, true);
final long messageID = getFirstMessageId(queueControl);
// Retry the message - i.e. it should go from DLQ to original Queue.
Assert.assertTrue(queueControl.retryMessage(messageID));
// Assert DLQ is empty...
Assert.assertEquals(0, getMessageCount(queueControl));
assertMessageMetrics(queueControl, 0, durable);
// .. and that the message is now on the original queue with ORIG RoutingType set as RoutingType
clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
assertTrue(clientMessage.getRoutingType() == RoutingType.ANYCAST);
Assert.assertNotNull(clientMessage);
Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
clientConsumer.close();
}
/**
* Test retry - get a message from auto-created DLA/DLQ and put on original queue.
*/
@Test
public void testRetryMessageReturnedWhenNoOrigQueue() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString qName = new SimpleString("q1");
final SimpleString adName = new SimpleString("ad1");
final String sampleText = "Put me on DLQ";
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(adName.toString());
final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(adName).concat(addressSettings.getDeadLetterQueueSuffix());
server.getAddressSettingsRepository().addMatch(adName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
session.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(durable));
// Send message to queue.
ClientProducer producer = session.createProducer(adName);
producer.send(createTextMessage(session, sampleText));
session.start();
ClientConsumer clientConsumer = session.createConsumer(qName);
ClientMessage clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
// force a rollback to DLQ
session.rollback();
clientMessage = clientConsumer.receiveImmediate();
Assert.assertNull(clientMessage);
clientConsumer.close();
QueueControl queueControl = createManagementControl(dla, dlq);
assertMessageMetrics(queueControl, 1, true);
final long messageID = getFirstMessageId(queueControl);
//Delete original queue
session.deleteQueue(qName);
// Retry the message
queueControl.retryMessage(messageID);
Thread.sleep(100);
// Assert DLQ is not empty...
Assert.assertEquals(1, getMessageCount(queueControl));
// .. and that the message is still intact on DLQ
clientConsumer = session.createConsumer(dlq);
clientMessage = clientConsumer.receive(500);
clientMessage.acknowledge();
Assert.assertNotNull(clientMessage);
Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
clientConsumer.close();
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
// Protected ----------------------------------------------------- // Protected -----------------------------------------------------

View File

@ -735,4 +735,65 @@ public class ScaleDownTest extends ClusterTestBase {
removeConsumer(0); removeConsumer(0);
removeConsumer(1); removeConsumer(1);
} }
@Test
public void testScaleDownMessageWithAutoCreatedDLAResources() throws Exception {
final SimpleString dla = new SimpleString("DLA");
final SimpleString queueName = new SimpleString("q1");
final SimpleString addressName = new SimpleString("q1");
final String sampleText = "Put me on DLA";
//Set up resources for Auto-created DLAs
AddressSettings addressSettings = servers[0].getAddressSettingsRepository().getMatch(addressName.toString());
servers[0].getAddressSettingsRepository().addMatch(addressName.toString(), new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla).setAutoCreateDeadLetterResources(true));
final SimpleString dlq = addressSettings.getDeadLetterQueuePrefix().concat(addressName).concat(addressSettings.getDeadLetterQueueSuffix());
createQueue(0, addressName.toString(), queueName.toString(), null, false, null, null, RoutingType.ANYCAST);
ClientSessionFactory sf = sfs[0];
ClientSession session = addClientSession(sf.createSession(true, false));
ClientProducer producer = addClientProducer(session.createProducer(addressName));
// Send message to queue with RoutingType header
ClientMessage m = createTextMessage(session, sampleText);
m.putByteProperty(Message.HDR_ROUTING_TYPE, (byte) 1);
producer.send(m);
session.start();
// Get message
ClientConsumer consumer = session.createConsumer(queueName);
ClientMessage message = consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(message.getBodyBuffer().readString(), sampleText);
assertTrue(message.getRoutingType() == RoutingType.ANYCAST);
message.acknowledge();
// force a rollback to DLA
session.rollback();
message = consumer.receiveImmediate();
Assert.assertNull(message);
//Make sure it ends up on DLA
consumer.close();
consumer = session.createConsumer(dlq.toString());
message = consumer.receive(1000);
Assert.assertNotNull(message);
assertTrue(message.getRoutingType() == null);
//Scale-Down
servers[0].stop();
//Get message on seconds node
sf = sfs[1];
session = addClientSession(sf.createSession(false, true, true));
consumer = session.createConsumer(dlq.toString());
session.start();
message = consumer.receive(1000);
Assert.assertNotNull(message);
message.acknowledge();
Assert.assertEquals(sampleText, message.getBodyBuffer().readString());
consumer.close();
}
} }