diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 4e5e60cf19..af3f4d3bb8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -267,9 +267,13 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject(); if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) { - expiration = properties.getAbsoluteExpiryTime().getTime(); + if (!expirationReload) { + expiration = properties.getAbsoluteExpiryTime().getTime(); + } } else if (header != null && header.getTtl() != null) { - expiration = System.currentTimeMillis() + header.getTtl().intValue(); + if (!expirationReload) { + expiration = System.currentTimeMillis() + header.getTtl().intValue(); + } } @@ -327,7 +331,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage if (Header.class.equals(constructor.getTypeClass())) { header = (Header) constructor.readValue(); if (header.getTtl() != null) { - expiration = System.currentTimeMillis() + header.getTtl().intValue(); + if (!expirationReload) { + expiration = System.currentTimeMillis() + header.getTtl().intValue(); + } } } } finally { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java index 0a059ce760..56ed3ff6b2 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java @@ -60,7 +60,8 @@ public class AMQPLargeMessagePersister extends MessagePersister { try { int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex() + - DataConstants.SIZE_BOOLEAN; // this last one is for is Reencoded + DataConstants.SIZE_LONG + // expiredTime + DataConstants.SIZE_BOOLEAN; // reencoded TypedProperties properties = ((AMQPMessage) record).getExtraProperties(); @@ -93,6 +94,7 @@ public class AMQPLargeMessagePersister extends MessagePersister { ByteBuf savedEncodeBuffer = msgEncode.getSavedEncodeBuffer(); buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex()); + buffer.writeLong(record.getExpiration()); buffer.writeBoolean(msgEncode.isReencoded()); msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as getSavedEncodedBuffer will keep 1 for himself until encoding has happened // which this is the expected event where we need to release the extra refCounter @@ -126,6 +128,10 @@ public class AMQPLargeMessagePersister extends MessagePersister { largeMessage.readSavedEncoding(buffer.byteBuf()); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + largeMessage.reloadExpiration(buffer.readLong()); + } + if (buffer.readable()) { boolean reEncoded = buffer.readBoolean(); largeMessage.setReencoded(reEncoded); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index c10f1a7fb3..5f86edb552 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -192,6 +192,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. protected SimpleString address; protected volatile int memoryEstimate = -1; protected long expiration; + protected boolean expirationReload = false; protected long scheduledTime = -1; // The Proton based AMQP message section that are retained in memory, these are the @@ -579,7 +580,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. messageAnnotations = null; properties = null; applicationProperties = null; - expiration = 0; + if (!expirationReload) { + expiration = 0; + } encodedHeaderSize = 0; memoryEstimate = -1; scheduledTime = -1; @@ -610,7 +613,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. headerPosition = constructorPos; encodedHeaderSize = data.position(); if (header.getTtl() != null) { - expiration = System.currentTimeMillis() + header.getTtl().intValue(); + if (!expirationReload) { + expiration = System.currentTimeMillis() + header.getTtl().intValue(); + } } } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) { deliveryAnnotationsPosition = constructorPos; @@ -624,7 +629,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. properties = (Properties) constructor.readValue(); if (properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) { - expiration = properties.getAbsoluteExpiryTime().getTime(); + if (!expirationReload) { + expiration = properties.getAbsoluteExpiryTime().getTime(); + } } } else if (ApplicationProperties.class.equals(constructor.getTypeClass())) { // Lazy decoding will start at the TypeConstructor of these ApplicationProperties @@ -927,6 +934,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. return expiration; } + public void reloadExpiration(long expiration) { + this.expiration = expiration; + this.expirationReload = true; + } + @Override public final AMQPMessage setExpiration(long expiration) { if (properties != null) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java index bd7ba0ed21..638244df5f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessagePersisterV2.java @@ -50,7 +50,8 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister { @Override public int getEncodeSize(Message record) { - int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT; + int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT + + DataConstants.SIZE_LONG; // expiration TypedProperties properties = ((AMQPMessage)record).getExtraProperties(); @@ -70,6 +71,8 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister { buffer.writeInt(properties.getEncodeSize()); properties.encode(buffer.byteBuf()); } + + buffer.writeLong(record.getExpiration()); } @Override @@ -97,6 +100,10 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister { extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null); } record.reloadAddress(address); + + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + record.reloadExpiration(buffer.readLong()); + } return record; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index 38d324f6ac..52697dca0f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -23,12 +23,18 @@ import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import java.util.HashMap; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -584,4 +590,149 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { connection.close(); } } + + @Test(timeout = 60000) + public void testExpireThorughAddressSettings() throws Exception { + testExpireThorughAddressSettings(false); + } + + @Test(timeout = 60000) + public void testExpireThorughAddressSettingsRebootServer() throws Exception { + testExpireThorughAddressSettings(true); + } + + private void testExpireThorughAddressSettings(boolean reboot) throws Exception { + + // Address configuration + AddressSettings addressSettings = new AddressSettings(); + + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + addressSettings.setAutoCreateQueues(isAutoCreateQueues()); + addressSettings.setAutoCreateAddresses(isAutoCreateAddresses()); + addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress())); + addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress())); + addressSettings.setExpiryDelay(1000L); + + server.getAddressSettingsRepository().clear(); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setDurable(true); + message.setApplicationProperty("key1", "Value1"); + sender.send(message); + + message = new AmqpMessage(); + message.setBytes(new byte[500 * 1024]); + message.setDurable(true); + sender.send(message); + sender.close(); + connection.close(); + + if (reboot) { + server.stop(); + server.getConfiguration().setMessageExpiryScanPeriod(100); + server.start(); + } + + final Queue dlqView = getProxyToQueue(getDeadLetterAddress()); + + Wait.assertEquals(2, dlqView::getMessageCount); + } + + @Test + public void testPreserveExpirationOnTTL() throws Exception { + + // Address configuration + AddressSettings addressSettings = new AddressSettings(); + + addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); + addressSettings.setAutoCreateQueues(isAutoCreateQueues()); + addressSettings.setAutoCreateAddresses(isAutoCreateAddresses()); + addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress())); + addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress())); + addressSettings.setExpiryDelay(1000L); + + server.getAddressSettingsRepository().clear(); + server.getAddressSettingsRepository().addMatch("#", addressSettings); + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setText("Test-Message"); + message.setDurable(true); + message.setTimeToLive(3600 * 1000); + message.setApplicationProperty("id", "0"); + sender.send(message); + + message = new AmqpMessage(); + message.setBytes(new byte[500 * 1024]); + message.setDurable(true); + message.setTimeToLive(3600 * 1000); + message.setApplicationProperty("id", "1"); + sender.send(message); + + Wait.assertEquals(2, queueView::getMessageCount); + LinkedListIterator linkedListIterator = queueView.iterator(); + HashMap dataSet = new HashMap<>(); + int count = 0; + while (linkedListIterator.hasNext()) { + count++; + MessageReference ref = linkedListIterator.next(); + String idUsed = ref.getMessage().getStringProperty("id"); + dataSet.put(idUsed, ref.getMessage().getExpiration()); + } + + Assert.assertEquals(2, count); + linkedListIterator.close(); + + server.stop(); + + Thread.sleep(500); // we need some time passing, as the TTL can't be recalculated here + server.getConfiguration().setMessageExpiryScanPeriod(100); + server.start(); + + final Queue queueViewAfterRestart = getProxyToQueue(getQueueName()); + + Wait.assertEquals(2, queueViewAfterRestart::getMessageCount); + + Thread.sleep(1000); + + linkedListIterator = queueViewAfterRestart.iterator(); + count = 0; + while (linkedListIterator.hasNext()) { + count++; + MessageReference ref = linkedListIterator.next(); + String idUsed = ref.getMessage().getStringProperty("id"); + long originalExpiration = dataSet.get(idUsed); + System.out.println("original Expiration = " + originalExpiration + " while this expiration = " + ref.getMessage().getExpiration()); + Assert.assertEquals(originalExpiration, ref.getMessage().getExpiration()); + } + Assert.assertEquals(2, count); + linkedListIterator.close(); + + + } + + + + }