ARTEMIS-4820 Read Header TTL as unsigned integer to set expiration
When setting expiration on the AMQPMessage the AMQP header TTL value should be read as an unsigned integer and as such should use the longValue API of UnsignedInteger to get the right value to set expiration.
This commit is contained in:
parent
a6ff05ecd7
commit
c6f227cbb8
|
@ -705,7 +705,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
|
|||
encodedHeaderSize = data.position() - constructorPos;
|
||||
if (header.getTtl() != null) {
|
||||
if (!expirationReload) {
|
||||
expiration = System.currentTimeMillis() + header.getTtl().intValue();
|
||||
expiration = System.currentTimeMillis() + header.getTtl().longValue();
|
||||
}
|
||||
}
|
||||
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
|
||||
|
|
|
@ -1027,6 +1027,18 @@ public class AMQPMessageTest {
|
|||
assertTrue(decoded.getExpiration() > System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetExpirationFromMessageWithMaxUIntTTL() {
|
||||
final long ttl = UnsignedInteger.MAX_VALUE.longValue();
|
||||
|
||||
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
|
||||
protonMessage.setHeader(new Header());
|
||||
protonMessage.setTtl(ttl);
|
||||
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
|
||||
|
||||
assertTrue(decoded.getExpiration() > System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetExpirationFromCoreMessageUsingTTL() {
|
||||
final long ttl = 100000;
|
||||
|
@ -1094,6 +1106,18 @@ public class AMQPMessageTest {
|
|||
assertEquals(expirationTime.getTime(), decoded.getExpiration());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetExpirationMaxUInt() {
|
||||
final Date expirationTime = new Date(UnsignedInteger.MAX_VALUE.longValue());
|
||||
|
||||
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
|
||||
AMQPStandardMessage decoded = encodeAndDecodeMessage(protonMessage);
|
||||
|
||||
assertEquals(0, decoded.getExpiration());
|
||||
decoded.setExpiration(expirationTime.getTime());
|
||||
assertEquals(expirationTime.getTime(), decoded.getExpiration());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetExpirationUpdatesProperties() {
|
||||
final Date originalExpirationTime = new Date(System.currentTimeMillis());
|
||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
|||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.apache.qpid.proton.amqp.UnsignedInteger;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -477,6 +478,55 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(60)
|
||||
public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMaxUInt() throws Exception {
|
||||
doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(60)
|
||||
public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMaxIntValue() throws Exception {
|
||||
doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.valueOf(Integer.MAX_VALUE));
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(60)
|
||||
public void testSendMessageThatIsNotExpiredUsingTimeToLiveOfMinusOne() throws Exception {
|
||||
doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger.valueOf(-1));
|
||||
}
|
||||
|
||||
private void doTestSendMessageThatIsNotExpiredUsingTimeToLive(UnsignedInteger ttl) throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getQueueName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getQueueName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setTimeToLive(ttl.longValue());
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
Wait.assertEquals(1, queueView::getMessageCount);
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getQueueName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(received, "Should have read message but it seems to have timed out.");
|
||||
assertEquals(ttl.longValue(), received.getTimeToLive());
|
||||
|
||||
Wait.assertEquals(0, queueView::getMessagesExpired);
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
@Timeout(60)
|
||||
public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception {
|
||||
|
@ -829,13 +879,10 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
|||
MessageReference ref = linkedListIterator.next();
|
||||
String idUsed = ref.getMessage().getStringProperty("id");
|
||||
long originalExpiration = dataSet.get(idUsed);
|
||||
System.out.println("original Expiration = " + originalExpiration + " while this expiration = " + ref.getMessage().getExpiration());
|
||||
logger.info("original Expiration = {} while this expiration = {}", originalExpiration, ref.getMessage().getExpiration());
|
||||
assertEquals(originalExpiration, ref.getMessage().getExpiration());
|
||||
}
|
||||
assertEquals(2, count);
|
||||
linkedListIterator.close();
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue