This closes #1096
This commit is contained in:
commit
dca124f9aa
|
@ -23,9 +23,6 @@ import java.util.HashSet;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
|
||||
import org.apache.activemq.artemis.api.core.ICoreMessage;
|
||||
|
@ -52,6 +49,10 @@ import org.apache.qpid.proton.codec.WritableBuffer;
|
|||
import org.apache.qpid.proton.message.Message;
|
||||
import org.apache.qpid.proton.message.impl.MessageImpl;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.PooledByteBufAllocator;
|
||||
import io.netty.buffer.Unpooled;
|
||||
|
||||
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
|
||||
public class AMQPMessage extends RefCountMessage {
|
||||
|
||||
|
@ -79,6 +80,7 @@ public class AMQPMessage extends RefCountMessage {
|
|||
this.data = Unpooled.wrappedBuffer(data);
|
||||
this.messageFormat = messageFormat;
|
||||
this.bufferValid = true;
|
||||
parseHeaders();
|
||||
|
||||
}
|
||||
|
||||
|
@ -320,7 +322,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
} else {
|
||||
section = null;
|
||||
}
|
||||
|
||||
}
|
||||
if (section instanceof MessageAnnotations) {
|
||||
_messageAnnotations = (MessageAnnotations) section;
|
||||
|
@ -330,11 +331,14 @@ public class AMQPMessage extends RefCountMessage {
|
|||
} else {
|
||||
section = null;
|
||||
}
|
||||
|
||||
}
|
||||
if (section instanceof Properties) {
|
||||
_properties = (Properties) section;
|
||||
|
||||
if (_properties.getAbsoluteExpiryTime() != null) {
|
||||
this.expiration = _properties.getAbsoluteExpiryTime().getTime();
|
||||
}
|
||||
|
||||
if (buffer.hasRemaining()) {
|
||||
section = (Section) decoder.readObject();
|
||||
} else {
|
||||
|
@ -894,5 +898,6 @@ public class AMQPMessage extends RefCountMessage {
|
|||
record.readBytes(recordArray);
|
||||
this.data = Unpooled.wrappedBuffer(recordArray);
|
||||
this.bufferValid = true;
|
||||
parseHeaders();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -426,12 +426,65 @@ public class AmqpMessage {
|
|||
}
|
||||
|
||||
/**
|
||||
* Sets the priority header on the outgoing message.
|
||||
* Gets the priority header on the message.
|
||||
*/
|
||||
public short getPriority() {
|
||||
return getWrappedMessage().getPriority();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the ttl header on the outgoing message.
|
||||
*
|
||||
* @param timeToLive the ttl value to set.
|
||||
*/
|
||||
public void setTimeToLive(long timeToLive) {
|
||||
checkReadOnly();
|
||||
lazyCreateHeader();
|
||||
getWrappedMessage().setTtl(timeToLive);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the ttl header on the outgoing message.
|
||||
*/
|
||||
public long getTimeToLive() {
|
||||
return getWrappedMessage().getTtl();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the absolute expiration time property on the message.
|
||||
*
|
||||
* @param absoluteExpiryTime the expiration time value to set.
|
||||
*/
|
||||
public void setAbsoluteExpiryTime(long absoluteExpiryTime) {
|
||||
checkReadOnly();
|
||||
lazyCreateProperties();
|
||||
getWrappedMessage().setExpiryTime(absoluteExpiryTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the absolute expiration time property on the message.
|
||||
*/
|
||||
public long getAbsoluteExpiryTime() {
|
||||
return getWrappedMessage().getExpiryTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the creation time property on the message.
|
||||
*
|
||||
* @param absoluteExpiryTime the expiration time value to set.
|
||||
*/
|
||||
public void setCreationTime(long creationTime) {
|
||||
checkReadOnly();
|
||||
lazyCreateProperties();
|
||||
getWrappedMessage().setCreationTime(creationTime);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the absolute expiration time property on the message.
|
||||
*/
|
||||
public long getCreationTime() {
|
||||
return getWrappedMessage().getCreationTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets a given application property on an outbound message.
|
||||
|
@ -597,21 +650,21 @@ public class AmqpMessage {
|
|||
|
||||
private void lazyCreateMessageAnnotations() {
|
||||
if (messageAnnotationsMap == null) {
|
||||
messageAnnotationsMap = new HashMap<Symbol, Object>();
|
||||
messageAnnotationsMap = new HashMap<>();
|
||||
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
|
||||
}
|
||||
}
|
||||
|
||||
private void lazyCreateDeliveryAnnotations() {
|
||||
if (deliveryAnnotationsMap == null) {
|
||||
deliveryAnnotationsMap = new HashMap<Symbol, Object>();
|
||||
deliveryAnnotationsMap = new HashMap<>();
|
||||
message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
|
||||
}
|
||||
}
|
||||
|
||||
private void lazyCreateApplicationProperties() {
|
||||
if (applicationPropertiesMap == null) {
|
||||
applicationPropertiesMap = new HashMap<String, Object>();
|
||||
applicationPropertiesMap = new HashMap<>();
|
||||
message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.amqp;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpClient;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpConnection;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpMessage;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSender;
|
||||
import org.apache.activemq.transport.amqp.client.AmqpSession;
|
||||
import org.junit.Test;
|
||||
|
||||
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
assertEquals(1, queueView.getMessageCount());
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
assertEquals(1, queueView.getMessagesExpired());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
assertEquals(1, queueView.getMessageCount());
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
|
||||
assertEquals(0, queueView.getMessagesExpired());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageThatIsExiredUsingAbsoluteTimeWithLongTTL() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
|
||||
// AET should override any TTL set
|
||||
message.setTimeToLive(60000);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
assertEquals(1, queueView.getMessageCount());
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
assertEquals(1, queueView.getMessagesExpired());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageThatIsNotExpiredUsingAbsoluteTimeWithElspsedTTL() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
|
||||
// AET should override any TTL set
|
||||
message.setTimeToLive(10);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
Thread.sleep(50);
|
||||
|
||||
assertEquals(1, queueView.getMessageCount());
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
|
||||
assertEquals(0, queueView.getMessagesExpired());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageThatIsNotExpiredUsingTimeToLive() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setTimeToLive(5000);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
assertEquals(1, queueView.getMessageCount());
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
|
||||
assertNotNull(received);
|
||||
|
||||
assertEquals(0, queueView.getMessagesExpired());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception {
|
||||
AmqpClient client = createAmqpClient();
|
||||
AmqpConnection connection = addConnection(client.connect());
|
||||
AmqpSession session = connection.createSession();
|
||||
|
||||
AmqpSender sender = session.createSender(getTestName());
|
||||
|
||||
// Get the Queue View early to avoid racing the delivery.
|
||||
final Queue queueView = getProxyToQueue(getTestName());
|
||||
assertNotNull(queueView);
|
||||
|
||||
AmqpMessage message = new AmqpMessage();
|
||||
message.setTimeToLive(10);
|
||||
message.setText("Test-Message");
|
||||
sender.send(message);
|
||||
sender.close();
|
||||
|
||||
Thread.sleep(50);
|
||||
|
||||
assertEquals(1, queueView.getMessageCount());
|
||||
|
||||
// Now try and get the message
|
||||
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||
receiver.flow(1);
|
||||
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
|
||||
assertNull(received);
|
||||
|
||||
assertEquals(1, queueView.getMessagesExpired());
|
||||
|
||||
connection.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue