ARTEMIS-2649 always over-write ORIG message props
ORIG message propertes like _AMQ_ORIG_ADDRESS are added to messages during various broker operations (e.g. diverting a message, expiring a message, etc.). However, if multiple operations try to set these properties on the same message (e.g. administratively moving a message which eventually gets sent to a dead-letter address) then important details can be lost. This is particularly problematic when using auto-created dead-letter or expiry resources which use filters based on _AMQ_ORIG_ADDRESS and can lead to message loss. This commit simply over-writes the existing ORIG properties rather than preserving them so that the most recent information is available.
This commit is contained in:
parent
86360a2846
commit
7096bc187a
|
@ -463,25 +463,9 @@ public interface Message {
|
|||
}
|
||||
|
||||
default void referenceOriginalMessage(final Message original, String originalQueue) {
|
||||
Object queueOnMessage = original.getBrokerProperty(Message.HDR_ORIGINAL_QUEUE);
|
||||
|
||||
if (queueOnMessage != null) {
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
|
||||
} else if (originalQueue != null) {
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
|
||||
}
|
||||
|
||||
Object originalID = original.getBrokerProperty(Message.HDR_ORIG_MESSAGE_ID);
|
||||
|
||||
if (originalID != null) {
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getBrokerProperty(Message.HDR_ORIGINAL_ADDRESS));
|
||||
|
||||
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, originalID);
|
||||
} else {
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
|
||||
|
||||
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
|
||||
}
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_QUEUE, originalQueue);
|
||||
setBrokerProperty(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
|
||||
setBrokerProperty(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
|
||||
|
||||
// reset expiry
|
||||
setExpiration(0);
|
||||
|
|
|
@ -1674,6 +1674,11 @@ public interface ActiveMQServerLogger extends BasicLogger {
|
|||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void pageLookupError(int pageNr, int messageNr, int offset, int startNr);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 222289, value = "Did not route to any matching bindings on dead-letter-address {0} and auto-create-dead-letter-resources is true; dropping message: {1}",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void noMatchingBindingsOnDLAWithAutoCreateDLAResources(SimpleString address, String message);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 224000, value = "Failure in initialisation", format = Message.Format.MESSAGE_FORMAT)
|
||||
void initializationError(@Cause Throwable e);
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
|
|||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
|
||||
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
|
||||
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||
|
@ -3477,7 +3478,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
ref.acknowledge(tx, AckReason.KILLED, null);
|
||||
} else {
|
||||
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
|
||||
move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
|
||||
RoutingStatus status = move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
|
||||
|
||||
// this shouldn't happen, but in case it does it's better to log a message than just drop the message silently
|
||||
if (status.equals(RoutingStatus.NO_BINDINGS) && server.getAddressSettingsRepository().getMatch(getAddress().toString()).isAutoCreateDeadLetterResources()) {
|
||||
ActiveMQServerLogger.LOGGER.noMatchingBindingsOnDLAWithAutoCreateDLAResources(deadLetterAddress, ref.toString());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
|
@ -3511,7 +3517,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
}
|
||||
}
|
||||
|
||||
private void move(final Transaction originalTX,
|
||||
private RoutingStatus move(final Transaction originalTX,
|
||||
final SimpleString address,
|
||||
final Binding binding,
|
||||
final MessageReference ref,
|
||||
|
@ -3531,13 +3537,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
|
|||
|
||||
copyMessage.setAddress(address);
|
||||
|
||||
postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
|
||||
RoutingStatus routingStatus = postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
|
||||
|
||||
acknowledge(tx, ref, reason, consumer);
|
||||
|
||||
if (originalTX == null) {
|
||||
tx.commit();
|
||||
}
|
||||
|
||||
return routingStatus;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
# Properties for Copied Messages
|
||||
|
||||
There are several operations within the broker that result in copying a
|
||||
message. These include:
|
||||
|
||||
- Diverting a message from one address to another.
|
||||
- Moving an expired message from a queue to the configured `expiry-address`
|
||||
- Moving a message which has exceeded its `max-delivery-attempts` from a queue
|
||||
to the configured `dead-letter-address`
|
||||
- Using the management API to administratively move messages from one queue to
|
||||
another
|
||||
|
||||
When this happens the body and properties of the original message are copied to
|
||||
a new message. However, the copying process removes some potentially important
|
||||
pieces of data so those are preserved in the following special message
|
||||
properties:
|
||||
|
||||
- `_AMQ_ORIG_ADDRESS`
|
||||
|
||||
a String property containing the *original address* of the message
|
||||
|
||||
- `_AMQ_ORIG_QUEUE`
|
||||
|
||||
a String property containing the *original queue* of the message
|
||||
|
||||
- `_AMQ_ORIG_MESSAGE_ID`
|
||||
|
||||
a String property containing the *original message ID* of the message
|
||||
|
||||
It's possible for the aforementioned operations to be combined. For example, a
|
||||
message may be diverted from one address to another where it lands in a queue
|
||||
and a consumer tries & fails to consume it such that the message is then sent
|
||||
to a dead-letter address. Or a message may be administratively moved from one
|
||||
queue to another where it then expires.
|
||||
|
||||
In cases like these the `ORIG` properties will contain the information from the
|
||||
_last_ (i.e. most recent) operation.
|
|
@ -57,12 +57,7 @@ geographically distributed servers, creating your global messaging mesh.
|
|||
Diverts are defined as xml in the `broker.xml` file at the `core` attribute level.
|
||||
There can be zero or more diverts in the file.
|
||||
|
||||
Diverted message gets a new message ID, and its address is set to a forward
|
||||
address. To access original values, use message properties: original destination
|
||||
is stored in a String property `_AMQ_ORIG_ADDRESS` (`Message.HDR_ORIGINAL_ADDRESS`
|
||||
constant from the Core API), and the original message ID in a Long property
|
||||
`_AMQ_ORIG_MESSAGE_ID` (`Message.HDR_ORIG_MESSAGE_ID` constant from the
|
||||
Core API).
|
||||
Diverted messages get [special properties](copied-message-properties.md).
|
||||
|
||||
Please see the examples for a full working example showing you how to
|
||||
configure and use diverts.
|
||||
|
|
|
@ -28,18 +28,8 @@ JMS MessageProducer allows to set a TimeToLive for the messages it sent:
|
|||
producer.setTimeToLive(5000);
|
||||
```
|
||||
|
||||
Expired messages which are consumed from an expiry address have the following
|
||||
properties:
|
||||
|
||||
- `_AMQ_ORIG_ADDRESS`
|
||||
|
||||
a String property containing the *original address* of the expired
|
||||
message
|
||||
|
||||
- `_AMQ_ORIG_QUEUE`
|
||||
|
||||
a String property containing the *original queue* of the expired
|
||||
message
|
||||
Expired messages get [special properties](copied-message-properties.md) plus this
|
||||
additional property:
|
||||
|
||||
- `_AMQ_ACTUAL_EXPIRY`
|
||||
|
||||
|
@ -123,21 +113,20 @@ an `address-setting` to configure the `expiry-address` much less
|
|||
the actual `address` and `queue` to hold the expired messages.
|
||||
|
||||
The solution to this problem is to set the `auto-create-expiry-resources`
|
||||
`address-setting` to `true` (it's `false` by default) so that the
|
||||
broker will create the `address` and `queue` to deal with the
|
||||
expired messages automatically. The `address` created will be the
|
||||
one defined by the `expiry-address`. A `MULTICAST` `queue` will be
|
||||
created on that `address`. It will be named by the `address` to which
|
||||
the message was originally sent, and it will have a filter defined using
|
||||
the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
|
||||
receive messages sent to the relevant `address`. The `queue` name can be
|
||||
configured with a prefix and suffix. See the relevant settings in the
|
||||
table below:
|
||||
`address-setting` to `true` (it's `false` by default) so that the broker will
|
||||
create the `address` and `queue` to deal with the expired messages
|
||||
automatically. The `address` created will be the one defined by the
|
||||
`expiry-address`. A `MULTICAST` `queue` will be created on that `address`.
|
||||
It will be named by the `address` to which the message was previously sent, and
|
||||
it will have a filter defined using the property `_AMQ_ORIG_ADDRESS` so that it
|
||||
will only receive messages sent to the relevant `address`. The `queue` name can
|
||||
be configured with a prefix and suffix. See the relevant settings in the table
|
||||
below:
|
||||
|
||||
`address-setting`|default
|
||||
---|---
|
||||
`expiry-queue-prefix`|`EXP.`
|
||||
`expiry-queue-suffix`|`` (empty string)
|
||||
`expiry-queue-suffix`|(empty string)
|
||||
|
||||
Here is an example configuration:
|
||||
|
||||
|
|
|
@ -165,18 +165,7 @@ set of addresses (see [Understanding the Wildcard Syntax](wildcard-syntax.md)).
|
|||
|
||||
### Dead Letter Properties
|
||||
|
||||
Dead letter messages which are consumed from a dead letter address have
|
||||
the following properties:
|
||||
|
||||
- `_AMQ_ORIG_ADDRESS`
|
||||
|
||||
a String property containing the *original address* of the dead
|
||||
letter message
|
||||
|
||||
- `_AMQ_ORIG_QUEUE`
|
||||
|
||||
a String property containing the *original queue* of the dead letter
|
||||
message
|
||||
Dead letter messages get [special properties](copied-message-properties.md).
|
||||
|
||||
### Automatically Creating Dead Letter Resources
|
||||
|
||||
|
@ -194,21 +183,20 @@ an `address-setting` to configure the `dead-letter-address` much less
|
|||
the actual `address` and `queue` to hold the undelivered messages.
|
||||
|
||||
The solution to this problem is to set the `auto-create-dead-letter-resources`
|
||||
`address-setting` to `true` (it's `false` by default) so that the
|
||||
broker will create the `address` and `queue` to deal with the
|
||||
undelivered messages automatically. The `address` created will be the
|
||||
one defined by the `dead-letter-address`. A `MULTICAST` `queue` will be
|
||||
created on that `address`. It will be named by the `address` to which
|
||||
the message was originally sent, and it will have a filter defined using
|
||||
the aforementioned `_AMQ_ORIG_ADDRESS` property so that it will only
|
||||
receive messages sent to the relevant `address`. The `queue` name can be
|
||||
configured with a prefix and suffix. See the relevant settings in the
|
||||
table below:
|
||||
`address-setting` to `true` (it's `false` by default) so that the broker will
|
||||
create the `address` and `queue` to deal with the undelivered messages
|
||||
automatically. The `address` created will be the one defined by the
|
||||
`dead-letter-address`. A `MULTICAST` `queue` will be created on that `address`.
|
||||
It will be named by the `address` to which the message was previously sent, and
|
||||
it will have a filter defined using the property `_AMQ_ORIG_ADDRESS` so that it
|
||||
will only receive messages sent to the relevant `address`. The `queue` name
|
||||
can be configured with a prefix and suffix. See the relevant settings in the
|
||||
table below:
|
||||
|
||||
`address-setting`|default
|
||||
---|---
|
||||
`dead-letter-queue-prefix`|`DLQ.`
|
||||
`dead-letter-queue-suffix`|`` (empty string)
|
||||
`dead-letter-queue-suffix`|(empty string)
|
||||
|
||||
Here is an example configuration:
|
||||
|
||||
|
|
|
@ -137,18 +137,12 @@ public class DLQAfterExpiredMessageTest extends AmqpClientTestSupport {
|
|||
|
||||
|
||||
// Redo the selection
|
||||
receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getQueueName() + "'");
|
||||
receiverDLQ = session.createReceiver(getDeadLetterAddress(), "\"m." + AMQPMessageSupport.HDR_ORIGINAL_ADDRESS_ANNOTATION + "\"='" + getExpiryQueue() + "'");
|
||||
receiverDLQ.flow(1);
|
||||
received = receiverDLQ.receive(5, TimeUnit.SECONDS);
|
||||
Assert.assertNotNull(received);
|
||||
received.accept();
|
||||
|
||||
/** When moving to DLQ, the original headers shoudln't be touched. */
|
||||
for (Map.Entry<String, Object> entry : annotations.entrySet()) {
|
||||
log.debug("Checking " + entry.getKey() + " = " + entry.getValue());
|
||||
Assert.assertEquals(entry.getKey() + " should be = " + entry.getValue(), entry.getValue(), received.getMessageAnnotation(entry.getKey()));
|
||||
}
|
||||
|
||||
assertEquals(0, received.getTimeToLive());
|
||||
assertNotNull(received);
|
||||
assertEquals("Value1", received.getApplicationProperty("key1"));
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.client.MessageHandler;
|
|||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
|
||||
import org.apache.activemq.artemis.core.config.Configuration;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.config.StoreConfiguration;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
|
||||
|
@ -62,7 +63,6 @@ import org.junit.Test;
|
|||
|
||||
public class LargeMessageTest extends LargeMessageTestBase {
|
||||
|
||||
|
||||
private static final Logger log = Logger.getLogger(LargeMessageTest.class);
|
||||
|
||||
private static final int RECEIVE_WAIT_TIME = 10000;
|
||||
|
@ -228,6 +228,62 @@ public class LargeMessageTest extends LargeMessageTestBase {
|
|||
validateNoFilesOnLargeDir();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDivertAndExpire() throws Exception {
|
||||
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||
final String DIVERTED = "diverted";
|
||||
|
||||
ClientSession session = null;
|
||||
|
||||
ActiveMQServer server = createServer(true, isNetty(), storeType);
|
||||
server.getConfiguration().setMessageExpiryScanPeriod(100);
|
||||
|
||||
server.start();
|
||||
|
||||
server.createQueue(new QueueConfiguration(DIVERTED));
|
||||
|
||||
server.getAddressSettingsRepository().addMatch(DIVERTED, new AddressSettings().setExpiryDelay(250L).setExpiryAddress(SimpleString.toSimpleString(DIVERTED + "Expiry")).setAutoCreateExpiryResources(true));
|
||||
|
||||
server.deployDivert(new DivertConfiguration().setName("myDivert").setAddress(ADDRESS.toString()).setForwardingAddress(DIVERTED).setExclusive(true));
|
||||
|
||||
ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
|
||||
|
||||
session = addClientSession(sf.createSession(false, false, false));
|
||||
|
||||
session.createQueue(new QueueConfiguration(ADDRESS).setDurable(false).setTemporary(true));
|
||||
|
||||
ClientProducer producer = session.createProducer(ADDRESS);
|
||||
|
||||
Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
|
||||
|
||||
producer.send(clientFile);
|
||||
|
||||
session.commit();
|
||||
|
||||
session.start();
|
||||
|
||||
Wait.waitFor(() -> server.locateQueue(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX + DIVERTED) != null, 1000, 100);
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX + DIVERTED);
|
||||
ClientMessage msg1 = consumer.receive(1000);
|
||||
msg1.acknowledge();
|
||||
session.commit();
|
||||
Assert.assertNotNull(msg1);
|
||||
|
||||
consumer.close();
|
||||
|
||||
try {
|
||||
msg1.getBodyBuffer().readByte();
|
||||
Assert.fail("Exception was expected");
|
||||
} catch (final Exception ignored) {
|
||||
// empty on purpose
|
||||
}
|
||||
|
||||
session.close();
|
||||
|
||||
validateNoFilesOnLargeDir();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteOnNoBinding() throws Exception {
|
||||
final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
|
||||
|
|
|
@ -485,7 +485,7 @@ public class DivertTest extends ActiveMQTestBase {
|
|||
instanceLog.debug("Received message " + message);
|
||||
assertNotNull(message);
|
||||
|
||||
if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("divert1")) {
|
||||
if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("queue1")) {
|
||||
countOriginal1++;
|
||||
} else if (message.getStringProperty(Message.HDR_ORIGINAL_QUEUE).equals("queue2")) {
|
||||
countOriginal2++;
|
||||
|
@ -1465,4 +1465,38 @@ public class DivertTest extends ActiveMQTestBase {
|
|||
server.destroyDivert(SimpleString.toSimpleString(DIVERT));
|
||||
assertNull(serviceRegistry.getDivertTransformer(DIVERT, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProperties() throws Exception {
|
||||
final String testAddress = "testAddress";
|
||||
final SimpleString queue = SimpleString.toSimpleString("queue");
|
||||
final int COUNT = 25;
|
||||
|
||||
ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), false));
|
||||
server.start();
|
||||
|
||||
server.createQueue(new QueueConfiguration(queue).setAddress(testAddress + (COUNT)).setRoutingType(RoutingType.ANYCAST));
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
server.deployDivert(new DivertConfiguration()
|
||||
.setName("divert" + i)
|
||||
.setAddress(testAddress + i)
|
||||
.setForwardingAddress(testAddress + (i + 1)));
|
||||
}
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
session.start();
|
||||
|
||||
ClientProducer producer = session.createProducer(new SimpleString(testAddress + "0"));
|
||||
ClientConsumer consumer1 = session.createConsumer(queue);
|
||||
ClientMessage message = session.createMessage(false);
|
||||
producer.send(message);
|
||||
|
||||
message = consumer1.receive(DivertTest.TIMEOUT);
|
||||
Assert.assertNotNull(message);
|
||||
message.acknowledge();
|
||||
Assert.assertEquals("testAddress" + COUNT, message.getAddress());
|
||||
Assert.assertEquals("testAddress" + (COUNT - 1), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1685,6 +1685,47 @@ public class QueueControlTest extends ManagementTestBase {
|
|||
session.deleteQueue(queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopiedMessageProperties() throws Exception {
|
||||
final String testAddress = "testAddress";
|
||||
final SimpleString queue = SimpleString.toSimpleString("queue");
|
||||
final int COUNT = 5;
|
||||
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
server.createQueue(new QueueConfiguration(queue.concat(Integer.toString(i))).setAddress(testAddress + i).setRoutingType(RoutingType.ANYCAST));
|
||||
}
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sf = createSessionFactory(locator);
|
||||
ClientSession session = sf.createSession(false, true, true);
|
||||
session.start();
|
||||
|
||||
ClientProducer producer = session.createProducer(new SimpleString(testAddress + "0"));
|
||||
ClientMessage message = session.createMessage(durable);
|
||||
producer.send(message);
|
||||
producer.close();
|
||||
|
||||
for (int i = 0; i < COUNT - 1; i++) {
|
||||
QueueControl queueControl = createManagementControl(SimpleString.toSimpleString(testAddress + i), queue.concat(Integer.toString(i)), RoutingType.ANYCAST);
|
||||
QueueControl otherQueueControl = createManagementControl(SimpleString.toSimpleString(testAddress + (i + 1)), queue.concat(Integer.toString(i + 1)), RoutingType.ANYCAST);
|
||||
assertMessageMetrics(queueControl, 1, durable);
|
||||
assertMessageMetrics(otherQueueControl, 0, durable);
|
||||
|
||||
int moved = queueControl.moveMessages(null, queue.concat(Integer.toString(i + 1)).toString());
|
||||
Assert.assertEquals(1, moved);
|
||||
assertMessageMetrics(queueControl, 0, durable);
|
||||
assertMessageMetrics(otherQueueControl, 1, durable);
|
||||
}
|
||||
|
||||
ClientConsumer consumer1 = session.createConsumer(queue.concat(Integer.toString(COUNT - 1)));
|
||||
message = consumer1.receive(1000);
|
||||
Assert.assertNotNull(message);
|
||||
message.acknowledge();
|
||||
System.out.println(message);
|
||||
Assert.assertEquals(testAddress + (COUNT - 1), message.getAddress());
|
||||
Assert.assertEquals(testAddress + (COUNT - 2), message.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
|
||||
}
|
||||
|
||||
/**
|
||||
* <ol>
|
||||
* <li>send 2 message to queue</li>
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientProducer;
|
|||
import org.apache.activemq.artemis.api.core.client.ClientSession;
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator;
|
||||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
|
||||
|
@ -196,8 +197,71 @@ public class AutoCreateDeadLetterResourcesTest extends ActiveMQTestBase {
|
|||
assertNotNull(context.createConsumer(context.createQueue(fqqn)).receive(2000));
|
||||
}
|
||||
|
||||
private void triggerDlaDelivery() throws Exception {
|
||||
@Test
|
||||
public void testDivertedMessage() throws Exception {
|
||||
SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
|
||||
String divertAddress = "divertAddress";
|
||||
|
||||
server.deployDivert(new DivertConfiguration().setName("testDivert").setAddress(divertAddress).setForwardingAddress(addressA.toString()));
|
||||
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
ClientSession session = addClientSession(sessionFactory.createSession(true, true));
|
||||
ClientProducer producer = addClientProducer(session.createProducer(divertAddress));
|
||||
producer.send(session.createMessage(true));
|
||||
producer.close();
|
||||
|
||||
Wait.assertEquals(1L, () -> server.locateQueue(queueA).getMessageCount(), 2000, 100);
|
||||
|
||||
triggerDlaDelivery();
|
||||
|
||||
Wait.assertTrue(() -> server.locateQueue(dlqName).getMessageCount() == 1, 2000, 100);
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(dlqName);
|
||||
session.start();
|
||||
ClientMessage message = consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMovedMessage() throws Exception {
|
||||
SimpleString dlqName = AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_PREFIX.concat(addressA).concat(AddressSettings.DEFAULT_DEAD_LETTER_QUEUE_SUFFIX);
|
||||
final SimpleString moveFromAddress = new SimpleString("moveFromAddress");
|
||||
final SimpleString moveFromQueue = new SimpleString("moveFromQueue");
|
||||
server.createQueue(new QueueConfiguration(moveFromQueue).setAddress(moveFromAddress).setRoutingType(RoutingType.ANYCAST));
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
|
||||
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
ClientSession session = addClientSession(sessionFactory.createSession(true, true));
|
||||
ClientProducer producer = addClientProducer(session.createProducer(moveFromAddress));
|
||||
producer.send(session.createMessage(true));
|
||||
producer.close();
|
||||
|
||||
server.locateQueue(moveFromQueue).moveReferences(null, addressA, null);
|
||||
|
||||
Wait.assertEquals(1L, () -> server.locateQueue(queueA).getMessageCount(), 2000, 100);
|
||||
|
||||
triggerDlaDelivery();
|
||||
|
||||
Wait.assertTrue(() -> server.locateQueue(dlqName).getMessageCount() == 1, 2000, 100);
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(dlqName);
|
||||
session.start();
|
||||
ClientMessage message = consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
message.acknowledge();
|
||||
}
|
||||
|
||||
private void triggerDlaDelivery() throws Exception {
|
||||
try {
|
||||
server.createQueue(new QueueConfiguration(queueA).setAddress(addressA).setRoutingType(RoutingType.ANYCAST));
|
||||
} catch (Exception e) {
|
||||
// ignore
|
||||
}
|
||||
ServerLocator locator = createInVMNonHALocator();
|
||||
ClientSessionFactory sessionFactory = createSessionFactory(locator);
|
||||
ClientSession session = addClientSession(sessionFactory.createSession(true, false));
|
||||
|
|
Loading…
Reference in New Issue