diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 1490edc2ed..4c43401fac 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -705,7 +705,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. encodedHeaderSize = data.position() - constructorPos; if (header.getTtl() != null) { if (!expirationReload) { - expiration = System.currentTimeMillis() + header.getTtl().intValue(); + expiration = System.currentTimeMillis() + header.getTtl().longValue(); } } } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 8ed39125c2..3c917fe349 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -1027,6 +1027,18 @@ public class AMQPMessageTest { assertTrue(decoded.getExpiration() > System.currentTimeMillis()); } + @Test + public void testGetExpirationFromMessageWithMaxUIntTTL() { + final long ttl = UnsignedInteger.MAX_VALUE.longValue(); + + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader(new Header()); + protonMessage.setTtl(ttl); + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertTrue(decoded.getExpiration() > System.currentTimeMillis()); + } + @Test public void testGetExpirationFromCoreMessageUsingTTL() { final long ttl = 100000; @@ -1094,6 +1106,18 @@ public class AMQPMessageTest { assertEquals(expirationTime.getTime(), decoded.getExpiration()); } + @Test + public void testSetExpirationMaxUInt() { + final Date expirationTime = new Date(UnsignedInteger.MAX_VALUE.longValue()); + + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(0, decoded.getExpiration()); + decoded.setExpiration(expirationTime.getTime()); + assertEquals(expirationTime.getTime(), decoded.getExpiration()); + } + @Test public void testSetExpirationUpdatesProperties() { final Date originalExpirationTime = new Date(System.currentTimeMillis()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java index 1429700209..604a9882da 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java @@ -53,6 +53,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.UnsignedInteger; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; @@ -477,6 +478,55 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { connection.close(); } + @Test + @Timeout(60) + public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMaxUInt() throws Exception { + doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.MAX_VALUE); + } + + @Test + @Timeout(60) + public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMaxIntValue() throws Exception { + doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.valueOf(Integer.MAX_VALUE)); + } + + @Test + @Timeout(60) + public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMinusOne() throws Exception { + doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.valueOf(-1)); + } + + private void doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger ttl) throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + + // Get the Queue View early to avoid racing the delivery. + final Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + + AmqpMessage message = new AmqpMessage(); + message.setTimeToLive(ttl.longValue()); + message.setText("Test-Message"); + sender.send(message); + sender.close(); + + Wait.assertEquals(1, queueView::getMessageCount); + + // Now try and get the message + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received, "Should have read message but it seems to have timed out."); + assertEquals(ttl.longValue(), received.getTimeToLive()); + + Wait.assertEquals(0, queueView::getMessagesExpired); + + connection.close(); + } + @Test @Timeout(60) public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception { @@ -829,13 +879,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport { MessageReference ref = linkedListIterator.next(); String idUsed = ref.getMessage().getStringProperty("id"); long originalExpiration = dataSet.get(idUsed); - System.out.println("original Expiration = " + originalExpiration + " while this expiration = " + ref.getMessage().getExpiration()); + logger.info("original Expiration = {} while this expiration = {}", originalExpiration, ref.getMessage().getExpiration()); assertEquals(originalExpiration, ref.getMessage().getExpiration()); } assertEquals(2, count); linkedListIterator.close(); - - } - }