This commit is contained in:
Clebert Suconic 2020-12-10 19:41:40 -05:00
commit 87e507aecb
5 changed files with 190 additions and 8 deletions

View File

@ -267,9 +267,13 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
expiration = properties.getAbsoluteExpiryTime().getTime();
if (!expirationReload) {
expiration = properties.getAbsoluteExpiryTime().getTime();
}
} else if (header != null && header.getTtl() != null) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
if (!expirationReload) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
}
}
@ -327,7 +331,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
if (Header.class.equals(constructor.getTypeClass())) {
header = (Header) constructor.readValue();
if (header.getTtl() != null) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
if (!expirationReload) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
}
}
}
} finally {

View File

@ -60,7 +60,8 @@ public class AMQPLargeMessagePersister extends MessagePersister {
try {
int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex() +
DataConstants.SIZE_BOOLEAN; // this last one is for is Reencoded
DataConstants.SIZE_LONG + // expiredTime
DataConstants.SIZE_BOOLEAN; // reencoded
TypedProperties properties = ((AMQPMessage) record).getExtraProperties();
@ -93,6 +94,7 @@ public class AMQPLargeMessagePersister extends MessagePersister {
ByteBuf savedEncodeBuffer = msgEncode.getSavedEncodeBuffer();
buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex());
buffer.writeLong(record.getExpiration());
buffer.writeBoolean(msgEncode.isReencoded());
msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as getSavedEncodedBuffer will keep 1 for himself until encoding has happened
// which this is the expected event where we need to release the extra refCounter
@ -126,6 +128,10 @@ public class AMQPLargeMessagePersister extends MessagePersister {
largeMessage.readSavedEncoding(buffer.byteBuf());
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
largeMessage.reloadExpiration(buffer.readLong());
}
if (buffer.readable()) {
boolean reEncoded = buffer.readBoolean();
largeMessage.setReencoded(reEncoded);

View File

@ -192,6 +192,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected SimpleString address;
protected volatile int memoryEstimate = -1;
protected long expiration;
protected boolean expirationReload = false;
protected long scheduledTime = -1;
// The Proton based AMQP message section that are retained in memory, these are the
@ -579,7 +580,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
messageAnnotations = null;
properties = null;
applicationProperties = null;
expiration = 0;
if (!expirationReload) {
expiration = 0;
}
encodedHeaderSize = 0;
memoryEstimate = -1;
scheduledTime = -1;
@ -610,7 +613,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
headerPosition = constructorPos;
encodedHeaderSize = data.position();
if (header.getTtl() != null) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
if (!expirationReload) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
}
}
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
deliveryAnnotationsPosition = constructorPos;
@ -624,7 +629,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
properties = (Properties) constructor.readValue();
if (properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
expiration = properties.getAbsoluteExpiryTime().getTime();
if (!expirationReload) {
expiration = properties.getAbsoluteExpiryTime().getTime();
}
}
} else if (ApplicationProperties.class.equals(constructor.getTypeClass())) {
// Lazy decoding will start at the TypeConstructor of these ApplicationProperties
@ -927,6 +934,11 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
return expiration;
}
public void reloadExpiration(long expiration) {
this.expiration = expiration;
this.expirationReload = true;
}
@Override
public final AMQPMessage setExpiration(long expiration) {
if (properties != null) {

View File

@ -50,7 +50,8 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
@Override
public int getEncodeSize(Message record) {
int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT;
int encodeSize = super.getEncodeSize(record) + DataConstants.SIZE_INT +
DataConstants.SIZE_LONG; // expiration
TypedProperties properties = ((AMQPMessage)record).getExtraProperties();
@ -70,6 +71,8 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
buffer.writeInt(properties.getEncodeSize());
properties.encode(buffer.byteBuf());
}
buffer.writeLong(record.getExpiration());
}
@Override
@ -97,6 +100,10 @@ public class AMQPMessagePersisterV2 extends AMQPMessagePersister {
extraProperties.decode(buffer.byteBuf(), pool != null ? pool.getPropertiesDecoderPools() : null);
}
record.reloadAddress(address);
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
record.reloadExpiration(buffer.readLong());
}
return record;
}

View File

@ -23,12 +23,18 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
@ -584,4 +590,149 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
connection.close();
}
}
@Test(timeout = 60000)
public void testExpireThorughAddressSettings() throws Exception {
testExpireThorughAddressSettings(false);
}
@Test(timeout = 60000)
public void testExpireThorughAddressSettingsRebootServer() throws Exception {
testExpireThorughAddressSettings(true);
}
private void testExpireThorughAddressSettings(boolean reboot) throws Exception {
// Address configuration
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setAutoCreateQueues(isAutoCreateQueues());
addressSettings.setAutoCreateAddresses(isAutoCreateAddresses());
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
addressSettings.setExpiryDelay(1000L);
server.getAddressSettingsRepository().clear();
server.getAddressSettingsRepository().addMatch("#", addressSettings);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setDurable(true);
message.setApplicationProperty("key1", "Value1");
sender.send(message);
message = new AmqpMessage();
message.setBytes(new byte[500 * 1024]);
message.setDurable(true);
sender.send(message);
sender.close();
connection.close();
if (reboot) {
server.stop();
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.start();
}
final Queue dlqView = getProxyToQueue(getDeadLetterAddress());
Wait.assertEquals(2, dlqView::getMessageCount);
}
@Test
public void testPreserveExpirationOnTTL() throws Exception {
// Address configuration
AddressSettings addressSettings = new AddressSettings();
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.setAutoCreateQueues(isAutoCreateQueues());
addressSettings.setAutoCreateAddresses(isAutoCreateAddresses());
addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress()));
addressSettings.setExpiryDelay(1000L);
server.getAddressSettingsRepository().clear();
server.getAddressSettingsRepository().addMatch("#", addressSettings);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
// Get the Queue View early to avoid racing the delivery.
final Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setText("Test-Message");
message.setDurable(true);
message.setTimeToLive(3600 * 1000);
message.setApplicationProperty("id", "0");
sender.send(message);
message = new AmqpMessage();
message.setBytes(new byte[500 * 1024]);
message.setDurable(true);
message.setTimeToLive(3600 * 1000);
message.setApplicationProperty("id", "1");
sender.send(message);
Wait.assertEquals(2, queueView::getMessageCount);
LinkedListIterator<MessageReference> linkedListIterator = queueView.iterator();
HashMap<String, Long> dataSet = new HashMap<>();
int count = 0;
while (linkedListIterator.hasNext()) {
count++;
MessageReference ref = linkedListIterator.next();
String idUsed = ref.getMessage().getStringProperty("id");
dataSet.put(idUsed, ref.getMessage().getExpiration());
}
Assert.assertEquals(2, count);
linkedListIterator.close();
server.stop();
Thread.sleep(500); // we need some time passing, as the TTL can't be recalculated here
server.getConfiguration().setMessageExpiryScanPeriod(100);
server.start();
final Queue queueViewAfterRestart = getProxyToQueue(getQueueName());
Wait.assertEquals(2, queueViewAfterRestart::getMessageCount);
Thread.sleep(1000);
linkedListIterator = queueViewAfterRestart.iterator();
count = 0;
while (linkedListIterator.hasNext()) {
count++;
MessageReference ref = linkedListIterator.next();
String idUsed = ref.getMessage().getStringProperty("id");
long originalExpiration = dataSet.get(idUsed);
System.out.println("original Expiration = " + originalExpiration + " while this expiration = " + ref.getMessage().getExpiration());
Assert.assertEquals(originalExpiration, ref.getMessage().getExpiration());
}
Assert.assertEquals(2, count);
linkedListIterator.close();
}
}