diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index a158799dce..56ac1f5aac 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -23,9 +23,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -52,6 +49,10 @@ import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; + // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { @@ -320,7 +321,6 @@ public class AMQPMessage extends RefCountMessage { } else { section = null; } - } if (section instanceof MessageAnnotations) { _messageAnnotations = (MessageAnnotations) section; @@ -330,11 +330,14 @@ public class AMQPMessage extends RefCountMessage { } else { section = null; } - } if (section instanceof Properties) { _properties = (Properties) section; + if (_properties.getAbsoluteExpiryTime() != null) { + this.expiration = _properties.getAbsoluteExpiryTime().getTime(); + } + if (buffer.hasRemaining()) { section = (Section) decoder.readObject(); } else { diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java index 0c4250db52..8599fa9656 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java @@ -426,12 +426,65 @@ public class AmqpMessage { } /** - * Sets the priority header on the outgoing message. + * Gets the priority header on the message. */ public short getPriority() { return getWrappedMessage().getPriority(); } + /** + * Sets the ttl header on the outgoing message. + * + * @param timeToLive the ttl value to set. + */ + public void setTimeToLive(long timeToLive) { + checkReadOnly(); + lazyCreateHeader(); + getWrappedMessage().setTtl(timeToLive); + } + + /** + * Sets the ttl header on the outgoing message. + */ + public long getTimeToLive() { + return getWrappedMessage().getTtl(); + } + + /** + * Sets the absolute expiration time property on the message. + * + * @param absoluteExpiryTime the expiration time value to set. + */ + public void setAbsoluteExpiryTime(long absoluteExpiryTime) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setExpiryTime(absoluteExpiryTime); + } + + /** + * Gets the absolute expiration time property on the message. + */ + public long getAbsoluteExpiryTime() { + return getWrappedMessage().getExpiryTime(); + } + + /** + * Sets the creation time property on the message. + * + * @param absoluteExpiryTime the expiration time value to set. + */ + public void setCreationTime(long creationTime) { + checkReadOnly(); + lazyCreateProperties(); + getWrappedMessage().setCreationTime(creationTime); + } + + /** + * Gets the absolute expiration time property on the message. + */ + public long getCreationTime() { + return getWrappedMessage().getCreationTime(); + } /** * Sets a given application property on an outbound message. @@ -597,21 +650,21 @@ public class AmqpMessage { private void lazyCreateMessageAnnotations() { if (messageAnnotationsMap == null) { - messageAnnotationsMap = new HashMap(); + messageAnnotationsMap = new HashMap<>(); message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap)); } } private void lazyCreateDeliveryAnnotations() { if (deliveryAnnotationsMap == null) { - deliveryAnnotationsMap = new HashMap(); + deliveryAnnotationsMap = new HashMap<>(); message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap)); } } private void lazyCreateApplicationProperties() { if (applicationPropertiesMap == null) { - applicationPropertiesMap = new HashMap(); + applicationPropertiesMap = new HashMap<>(); message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap)); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java new file mode 100644 index 0000000000..0a1f382b91 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; + +public class AmqpExpiredMessageTest extends AmqpClientTestSupport { + + @Test(timeout = 60000) + public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getMessageCount()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNull(received); + + assertEquals(1, queueView.getMessagesExpired()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getMessageCount()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + + assertEquals(0, queueView.getMessagesExpired()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsExiredUsingAbsoluteTimeWithLongTTL() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000); + // AET should override any TTL set + message.setTimeToLive(60000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getMessageCount()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNull(received); + + assertEquals(1, queueView.getMessagesExpired()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsNotExpiredUsingAbsoluteTimeWithElspsedTTL() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000); + // AET should override any TTL set + message.setTimeToLive(10); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + Thread.sleep(50); + + assertEquals(1, queueView.getMessageCount()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + + assertEquals(0, queueView.getMessagesExpired()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThatIsNotExpiredUsingTimeToLive() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setTimeToLive(5000); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + assertEquals(1, queueView.getMessageCount()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + + assertEquals(0, queueView.getMessagesExpired()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getTestName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setTimeToLive(10); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + Thread.sleep(50); + + assertEquals(1, queueView.getMessageCount()); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getTestName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNull(received); + + assertEquals(1, queueView.getMessagesExpired()); + + connection.close(); + } +}