This commit is contained in:
Clebert Suconic 2020-06-01 15:19:34 -04:00
commit 2d08e45786
12 changed files with 279 additions and 84 deletions

View File

@ -463,25 +463,9 @@ public interface Message {
}
default void referenceOriginalMessage(final Message original, String originalQueue) {
Object queueOnMessage = original.getBrokerProperty(Message.HDR_ORIGINAL_QUEUE);
if (queueOnMessage != null) {
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
} else if (originalQueue != null) {
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
}
Object originalID = original.getBrokerProperty(Message.HDR_ORIG_MESSAGE_ID);
if (originalID != null) {
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getBrokerProperty(Message.HDR_ORIGINAL_ADDRESS));
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, originalID);
} else {
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
}
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
// reset expiry
setExpiration(0);

View File

@ -1674,6 +1674,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void pageLookupError(int pageNr, int messageNr, int offset, int startNr);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222289, value = "Did not route to any matching bindings on dead-letter-address {0} and auto-create-dead-letter-resources is true; dropping message: {1}",
format = Message.Format.MESSAGE_FORMAT)
void noMatchingBindingsOnDLAWithAutoCreateDLAResources(SimpleString address, String message);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);

View File

@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
@ -3477,7 +3478,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ref.acknowledge(tx, AckReason.KILLED, null);
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
RoutingStatus status = move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
// this shouldn't happen, but in case it does it's better to log a message than just drop the message silently
if (status.equals(RoutingStatus.NO_BINDINGS) && server.getAddressSettingsRepository().getMatch(getAddress().toString()).isAutoCreateDeadLetterResources()) {
ActiveMQServerLogger.LOGGER.noMatchingBindingsOnDLAWithAutoCreateDLAResources(deadLetterAddress, ref.toString());
}
return true;
}
} else {
@ -3511,7 +3517,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
private void move(final Transaction originalTX,
private RoutingStatus move(final Transaction originalTX,
final SimpleString address,
final Binding binding,
final MessageReference ref,
@ -3531,13 +3537,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
copyMessage.setAddress(address);
postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
acknowledge(tx, ref, reason, consumer);
if (originalTX == null) {
tx.commit();
}
return routingStatus;
}
/*

View File

@ -0,0 +1,37 @@
# Properties for Copied Messages
There are several operations within the broker that result in copying a
message. These include:
- Diverting a message from one address to another.
- Moving an expired message from a queue to the configured `expiry-address`
- Moving a message which has exceeded its `max-delivery-attempts` from a queue
to the configured `dead-letter-address`
- Using the management API to administratively move messages from one queue to
another
When this happens the body and properties of the original message are copied to
a new message. However, the copying process removes some potentially important
pieces of data so those are preserved in the following special message
properties:
- `_AMQ_ORIG_ADDRESS`
a String property containing the *original address* of the message
- `_AMQ_ORIG_QUEUE`
a String property containing the *original queue* of the message
- `_AMQ_ORIG_MESSAGE_ID`
a String property containing the *original message ID* of the message
It's possible for the aforementioned operations to be combined. For example, a
message may be diverted from one address to another where it lands in a queue
and a consumer tries & fails to consume it such that the message is then sent
to a dead-letter address. Or a message may be administratively moved from one
queue to another where it then expires.
In cases like these the `ORIG` properties will contain the information from the
_last_ (i.e. most recent) operation.

View File

@ -57,12 +57,7 @@ geographically distributed servers, creating your global messaging mesh.
Diverts are defined as xml in the `broker.xml` file at the `core` attribute level.
There can be zero or more diverts in the file.
Diverted message gets a new message ID, and its address is set to a forward
address. To access original values, use message properties: original destination
is stored in a String property `_AMQ_ORIG_ADDRESS` (`Message.HDR_ORIGINAL_ADDRESS`
constant from the Core API), and the original message ID in a Long property
`_AMQ_ORIG_MESSAGE_ID` (`Message.HDR_ORIG_MESSAGE_ID` constant from the
Core API).
Diverted messages get [special properties](copied-message-properties.md).
Please see the examples for a full working example showing you how to
configure and use diverts.

View File

@ -28,18 +28,8 @@ JMS MessageProducer allows to set a TimeToLive for the messages it sent:
producer.setTimeToLive(5000);
```
Expired messages which are consumed from an expiry address have the following
properties:
- `_AMQ_ORIG_ADDRESS`
a String property containing the *original address* of the expired
message
- `_AMQ_ORIG_QUEUE`
a String property containing the *original queue* of the expired
message
Expired messages get [special properties](copied-message-properties.md) plus this
additional property:
- `_AMQ_ACTUAL_EXPIRY`
@ -123,21 +113,20 @@ an `address-setting` to configure the `expiry-address` much less
the actual `address` and `queue` to hold the expired messages.
The solution to this problem is to set the `auto-create-expiry-resources`
`address-setting` to `true` (it's `false` by default) so that the
broker will create the `address` and `queue` to deal with the
expired messages automatically. The `address` created will be the
one defined by the `expiry-address`. A `MULTICAST` `queue` will be
created on that `address`. It will be named by the `address` to which
the message was originally sent, and it will have a filter defined using
the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
receive messages sent to the relevant `address`. The `queue` name can be
configured with a prefix and suffix. See the relevant settings in the
table below:
`address-setting` to `true` (it's `false` by default) so that the broker will
create the `address` and `queue` to deal with the expired messages
automatically. The `address` created will be the one defined by the
`expiry-address`. A `MULTICAST` `queue` will be created on that `address`.
It will be named by the `address` to which the message was previously sent, and
it will have a filter defined using the property `_AMQ_ORIG_ADDRESS` so that it
will only receive messages sent to the relevant `address`. The `queue` name can
be configured with a prefix and suffix. See the relevant settings in the table
below:
`address-setting`|default
---|---
`expiry-queue-prefix`|`EXP.`
`expiry-queue-suffix`|`` (empty string)
`expiry-queue-suffix`|(empty string)
Here is an example configuration:

View File

@ -165,18 +165,7 @@ set of addresses (see [Understanding the Wildcard Syntax](wildcard-syntax.md)).
### Dead Letter Properties
Dead letter messages which are consumed from a dead letter address have
the following properties:
- `_AMQ_ORIG_ADDRESS`
a String property containing the *original address* of the dead
letter message
- `_AMQ_ORIG_QUEUE`
a String property containing the *original queue* of the dead letter
message
Dead letter messages get [special properties](copied-message-properties.md).
### Automatically Creating Dead Letter Resources
@ -194,21 +183,20 @@ an `address-setting` to configure the `dead-letter-address` much less
the actual `address` and `queue` to hold the undelivered messages.
The solution to this problem is to set the `auto-create-dead-letter-resources`
`address-setting` to `true` (it's `false` by default) so that the
broker will create the `address` and `queue` to deal with the
undelivered messages automatically. The `address` created will be the
one defined by the `dead-letter-address`. A `MULTICAST` `queue` will be
created on that `address`. It will be named by the `address` to which
the message was originally sent, and it will have a filter defined using
the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
receive messages sent to the relevant `address`. The `queue` name can be
configured with a prefix and suffix. See the relevant settings in the
table below:
`address-setting` to `true` (it's `false` by default) so that the broker will
create the `address` and `queue` to deal with the undelivered messages
automatically. The `address` created will be the one defined by the
`dead-letter-address`. A `MULTICAST` `queue` will be created on that `address`.
It will be named by the `address` to which the message was previously sent, and
it will have a filter defined using the property `_AMQ_ORIG_ADDRESS` so that it
will only receive messages sent to the relevant `address`. The `queue` name
can be configured with a prefix and suffix. See the relevant settings in the
table below:
`address-setting`|default
---|---
`dead-letter-queue-prefix`|`DLQ.`
`dead-letter-queue-suffix`|`` (empty string)
`dead-letter-queue-suffix`|(empty string)
Here is an example configuration:

View File

@ -137,18 +137,12 @@ public class DLQAfterExpiredMessageTest extends AmqpClientTestSupport {
// Redo the selection
receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getQueueName() + "'");
receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getExpiryQueue() + "'");
receiverDLQ.flow(1);
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(received);
received.accept();
/** When moving to DLQ, the original headers shoudln't be touched. */
for (Map.Entry<String, Object> entry : annotations.entrySet()) {
log.debug("Checking " + entry.getKey() + " = " + entry.getValue());
Assert.assertEquals(entry.getKey() + " should be = " + entry.getValue(), entry.getValue(), received.getMessageAnnotation(entry.getKey()));
}
assertEquals(0, received.getTimeToLive());
assertNotNull(received);
assertEquals("Value1", received.getApplicationProperty("key1"));

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.StoreConfiguration;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
@ -62,7 +63,6 @@ import org.junit.Test;
public class LargeMessageTest extends LargeMessageTestBase {
private static final Logger log = Logger.getLogger(LargeMessageTest.class);
private static final int RECEIVE_WAIT_TIME = 10000;
@ -228,6 +228,62 @@ public class LargeMessageTest extends LargeMessageTestBase {
validateNoFilesOnLargeDir();
}
@Test
public void testDivertAndExpire() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
final String DIVERTED = "diverted";
ClientSession session = null;
ActiveMQServer server = createServer(true, isNetty(), storeType);
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.start();
server.createQueue(new QueueConfiguration(DIVERTED));
server.getAddressSettingsRepository().addMatch(DIVERTED, new AddressSettings().setExpiryDelay(250L).setExpiryAddress(SimpleString.toSimpleString(DIVERTED + "Expiry")).setAutoCreateExpiryResources(true));
server.deployDivert(new DivertConfiguration().setName("myDivert").setAddress(ADDRESS.toString()).setForwardingAddress(DIVERTED).setExclusive(true));
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
session = addClientSession(sf.createSession(false, false, false));
session.createQueue(new QueueConfiguration(ADDRESS).setDurable(false).setTemporary(true));
ClientProducer producer = session.createProducer(ADDRESS);
Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
producer.send(clientFile);
session.commit();
session.start();
Wait.waitFor(() -> server.locateQueue(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX + DIVERTED) != null, 1000, 100);
ClientConsumer consumer = session.createConsumer(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX + DIVERTED);
ClientMessage msg1 = consumer.receive(1000);
msg1.acknowledge();
session.commit();
Assert.assertNotNull(msg1);
consumer.close();
try {
msg1.getBodyBuffer().readByte();
Assert.fail("Exception was expected");
} catch (final Exception ignored) {
// empty on purpose
}
session.close();
validateNoFilesOnLargeDir();
}
@Test
public void testDeleteOnNoBinding() throws Exception {
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);

View File

@ -485,7 +485,7 @@ public class DivertTest extends ActiveMQTestBase {
instanceLog.debug("Received message " + message);
assertNotNull(message);
if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("divert1")) {
if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("queue1")) {
countOriginal1++;
} else if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("queue2")) {
countOriginal2++;
@ -1465,4 +1465,38 @@ public class DivertTest extends ActiveMQTestBase {
server.destroyDivert(SimpleString.toSimpleString(DIVERT));
assertNull(serviceRegistry.getDivertTransformer(DIVERT, null));
}
@Test
public void testProperties() throws Exception {
final String testAddress = "testAddress";
final SimpleString queue = SimpleString.toSimpleString("queue");
final int COUNT = 25;
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
server.start();
server.createQueue(new QueueConfiguration(queue).setAddress(testAddress + (COUNT)).setRoutingType(RoutingType.ANYCAST));
for (int i = 0; i < COUNT; i++) {
server.deployDivert(new DivertConfiguration()
.setName("divert" + i)
.setAddress(testAddress + i)
.setForwardingAddress(testAddress + (i + 1)));
}
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.start();
ClientProducer producer = session.createProducer(new SimpleString(testAddress + "0"));
ClientConsumer consumer1 = session.createConsumer(queue);
ClientMessage message = session.createMessage(false);
producer.send(message);
message = consumer1.receive(DivertTest.TIMEOUT);
Assert.assertNotNull(message);
message.acknowledge();
Assert.assertEquals("testAddress" + COUNT, message.getAddress());
Assert.assertEquals("testAddress" + (COUNT - 1), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
}
}

View File

@ -1685,6 +1685,47 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testCopiedMessageProperties() throws Exception {
final String testAddress = "testAddress";
final SimpleString queue = SimpleString.toSimpleString("queue");
final int COUNT = 5;
for (int i = 0; i < COUNT; i++) {
server.createQueue(new QueueConfiguration(queue.concat(Integer.toString(i))).setAddress(testAddress + i).setRoutingType(RoutingType.ANYCAST));
}
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sf = createSessionFactory(locator);
ClientSession session = sf.createSession(false, true, true);
session.start();
ClientProducer producer = session.createProducer(new SimpleString(testAddress + "0"));
ClientMessage message = session.createMessage(durable);
producer.send(message);
producer.close();
for (int i = 0; i < COUNT - 1; i++) {
QueueControl queueControl = createManagementControl(SimpleString.toSimpleString(testAddress + i), queue.concat(Integer.toString(i)), RoutingType.ANYCAST);
QueueControl otherQueueControl = createManagementControl(SimpleString.toSimpleString(testAddress + (i + 1)), queue.concat(Integer.toString(i + 1)), RoutingType.ANYCAST);
assertMessageMetrics(queueControl, 1, durable);
assertMessageMetrics(otherQueueControl, 0, durable);
int moved = queueControl.moveMessages(null, queue.concat(Integer.toString(i + 1)).toString());
Assert.assertEquals(1, moved);
assertMessageMetrics(queueControl, 0, durable);
assertMessageMetrics(otherQueueControl, 1, durable);
}
ClientConsumer consumer1 = session.createConsumer(queue.concat(Integer.toString(COUNT - 1)));
message = consumer1.receive(1000);
Assert.assertNotNull(message);
message.acknowledge();
System.out.println(message);
Assert.assertEquals(testAddress + (COUNT - 1), message.getAddress());
Assert.assertEquals(testAddress + (COUNT - 2), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
}
/**
* <ol>
* <li>send 2 message to queue</li>

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -196,8 +197,71 @@ public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase {
assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
}
private void triggerDlaDelivery() throws Exception {
@Test
public void testDivertedMessage() throws Exception {
SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
String divertAddress = "divertAddress";
server.deployDivert(new DivertConfiguration().setName("testDivert").setAddress(divertAddress).setForwardingAddress(addressA.toString()));
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = addClientSession(sessionFactory.createSession(true, true));
ClientProducer producer = addClientProducer(session.createProducer(divertAddress));
producer.send(session.createMessage(true));
producer.close();
Wait.assertEquals(1L, () -> server.locateQueue(queueA).getMessageCount(), 2000, 100);
triggerDlaDelivery();
Wait.assertTrue(() -> server.locateQueue(dlqName).getMessageCount() == 1, 2000, 100);
ClientConsumer consumer = session.createConsumer(dlqName);
session.start();
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
}
@Test
public void testMovedMessage() throws Exception {
SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
final SimpleString moveFromAddress = new SimpleString("moveFromAddress");
final SimpleString moveFromQueue = new SimpleString("moveFromQueue");
server.createQueue(new QueueConfiguration(moveFromQueue).setAddress(moveFromAddress).setRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = addClientSession(sessionFactory.createSession(true, true));
ClientProducer producer = addClientProducer(session.createProducer(moveFromAddress));
producer.send(session.createMessage(true));
producer.close();
server.locateQueue(moveFromQueue).moveReferences(null, addressA, null);
Wait.assertEquals(1L, () -> server.locateQueue(queueA).getMessageCount(), 2000, 100);
triggerDlaDelivery();
Wait.assertTrue(() -> server.locateQueue(dlqName).getMessageCount() == 1, 2000, 100);
ClientConsumer consumer = session.createConsumer(dlqName);
session.start();
ClientMessage message = consumer.receive(1000);
assertNotNull(message);
message.acknowledge();
}
private void triggerDlaDelivery() throws Exception {
try {
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
} catch (Exception e) {
// ignore
}
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = addClientSession(sessionFactory.createSession(true, false));