ARTEMIS-3019 expiry changes in AMQP Large Message

This commit is contained in:
Clebert Suconic 2020-12-04 11:46:34 -05:00
parent ccefbfcfa9
commit e6a6e81b02
6 changed files with 195 additions and 3 deletions

View File

@ -302,6 +302,11 @@ public interface Message {
/** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
Message copy(long newID);
/** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */
default Message copy(long newID, boolean isExpiryOrDLQ) {
return copy(newID);
}
default boolean acceptsConsumer(long uniqueConsumerID) {
return true;
}

View File

@ -158,6 +158,11 @@ public class MessageInternalImpl implements MessageInternal {
return message.copy(newID);
}
@Override
public Message copy(long newID, boolean isDLQorExpiry) {
return message.copy(newID, isDLQorExpiry);
}
/**
* Returns the messageID.
* <br>

View File

@ -18,9 +18,11 @@
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -258,6 +260,13 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject();
if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) {
expiration = properties.getAbsoluteExpiryTime().getTime();
} else if (header != null && header.getTtl() != null) {
expiration = System.currentTimeMillis() + header.getTtl().intValue();
}
} finally {
TLSEncode.getDecoder().setBuffer(oldBuffer);
}
@ -446,10 +455,22 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
@Override
public Message copy(final long newID) {
return copy(newID, false);
}
@Override
public Message copy(final long newID, boolean isDLQOrExpiry) {
try {
AMQPLargeMessage copy = new AMQPLargeMessage(newID, messageFormat, null, coreMessageObjectPools, storageManager);
copy.setDurable(this.isDurable());
largeBody.copyInto(copy);
final AtomicInteger place = new AtomicInteger(0);
ByteBuf bufferNewHeader = null;
if (isDLQOrExpiry) {
bufferNewHeader = newHeaderWithoutExpiry(place);
}
largeBody.copyInto(copy, bufferNewHeader, place.intValue());
copy.finishParse();
copy.releaseResources(true);
return copy;
@ -460,7 +481,46 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
}
}
protected ByteBuf newHeaderWithoutExpiry(AtomicInteger placeOutput) {
ByteBuf bufferNewHeader;
Header headerCopy = null;
if (header != null) {
headerCopy = new Header(header);
headerCopy.setTtl(null); // just in case
}
MessageAnnotations messageAnnotationsRef = this.messageAnnotations;
Properties propertiesCopy = null;
if (properties != null) {
propertiesCopy = new Properties(properties);
propertiesCopy.setAbsoluteExpiryTime(null); // just in case
}
if (applicationPropertiesPosition != VALUE_NOT_PRESENT) {
placeOutput.set(applicationPropertiesPosition);
} else {
placeOutput.set(remainingBodyPosition);
}
if (placeOutput.get() < 0) {
placeOutput.set(0);
bufferNewHeader = null;
} else {
bufferNewHeader = Unpooled.buffer(placeOutput.get());
}
if (bufferNewHeader != null) {
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(bufferNewHeader));
if (headerCopy != null)
TLSEncode.getEncoder().writeObject(headerCopy);
if (messageAnnotationsRef != null)
TLSEncode.getEncoder().writeObject(messageAnnotationsRef);
if (propertiesCopy != null)
TLSEncode.getEncoder().writeObject(propertiesCopy);
}
return bufferNewHeader;
}
@Override
public void messageChanged() {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -316,6 +317,10 @@ public class LargeBody {
}
public void copyInto(LargeServerMessage newMessage) throws Exception {
copyInto(newMessage, null, 0);
}
public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipBytes) throws Exception {
//clone a SequentialFile to avoid concurrent access
SequentialFile cloneFile = getReadingFile();
@ -328,8 +333,11 @@ public class LargeBody {
cloneFile.open();
}
cloneFile.position(0);
cloneFile.position(skipBytes);
if (newHeader != null) {
newMessage.addBytes(new ChannelBufferWrapper(newHeader));
}
for (; ; ) {
// The buffer is reused...
// We need to make sure we clear the limits and the buffer before reusing it

View File

@ -3447,7 +3447,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
long newID = storageManager.generateID();
Message copy = message.copy(newID);
Message copy = message.copy(newID, true);
if (copyOriginalHeaders) {
copy.referenceOriginalMessage(message, ref.getQueue().getName().toString());

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import org.apache.activemq.artemis.api.core.Message;
@ -33,6 +39,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
@ -233,6 +240,113 @@ public class ExpiryLargeMessageTest extends ActiveMQTestBase {
validateNoFilesOnLargeDir();
}
@Test
public void testExpiryMessagesAMQP() throws Exception {
testExpiryMessagesAMQP(false, 300 * 1024);
}
@Test
public void testExpiryMessagesAMQPRestartBeforeExpiry() throws Exception {
testExpiryMessagesAMQP(true, 300 * 1024);
}
// this is just sanity check for the test
@Test
public void testExpiryMessagesAMQPRegularMessageStandardMessage() throws Exception {
testExpiryMessagesAMQP(false, 30);
}
// this is just sanity check for the test
@Test
public void testExpiryMessagesAMQPRestartBeforeExpiryStandardMessage() throws Exception {
testExpiryMessagesAMQP(true, 30);
}
public void testExpiryMessagesAMQP(boolean restartBefore, int bodySize) throws Exception {
ActiveMQServer server = createServer(true, true);
server.getConfiguration().setMessageExpiryScanPeriod(6000);
AddressSettings setting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(50 * 1024).setPageSizeBytes(10 * 1024).setExpiryAddress(EXPIRY).setDeadLetterAddress(DLQ);
server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
server.start();
server.createQueue(new QueueConfiguration(EXPIRY).setRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(DLQ).setRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
ConnectionFactory connectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
byte[] bufferSample = new byte[bodySize];
for (int i = 0; i < bufferSample.length; i++) {
bufferSample[i] = getSamplebyte(i);
}
javax.jms.Queue jmsQueue = session.createQueue(MY_QUEUE.toString());
MessageProducer producer = session.createProducer(jmsQueue);
producer.setTimeToLive(300);
for (int i = 0; i < numberOfMessages; i++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(bufferSample);
message.setIntProperty("count", i);
producer.send(message);
}
session.close();
connection.close();
if (restartBefore) {
server.stop();
server.start();
}
Queue queueExpiry = server.locateQueue(EXPIRY);
Queue myQueue = server.locateQueue(MY_QUEUE);
Wait.assertEquals(numberOfMessages, () -> {
myQueue.expireReferences();
return getMessageCount(queueExpiry);
});
if (!restartBefore) {
server.stop();
server.start();
}
// validateNoFilesOnLargeDir(getLargeMessagesDir(), numberOfMessages);
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cons = session.createConsumer(session.createQueue(EXPIRY.toString()));
connection.start();
// Consume half of the messages to make sure all the messages are paging (on the second try)
for (int i = 0; i < numberOfMessages; i++) {
javax.jms.Message msg = cons.receive(5000);
assertNotNull(msg);
msg.acknowledge();
}
session.commit();
connection.close();
}
/**
* Tests if the system would still couple with old data where the LargeMessage was linked to its previous copy
*