diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index 2d3c5483e0..56a7cfcdcd 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -302,6 +302,11 @@ public interface Message { /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ Message copy(long newID); + /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ + default Message copy(long newID, boolean isExpiryOrDLQ) { + return copy(newID); + } + default boolean acceptsConsumer(long uniqueConsumerID) { return true; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java index 92e232802a..10956099a2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/MessageInternalImpl.java @@ -158,6 +158,11 @@ public class MessageInternalImpl implements MessageInternal { return message.copy(newID); } + @Override + public Message copy(long newID, boolean isDLQorExpiry) { + return message.copy(newID, isDLQorExpiry); + } + /** * Returns the messageID. *
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index d2c637cd25..64ff31fb28 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -18,9 +18,11 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicInteger; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -258,6 +260,13 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject(); + if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) { + expiration = properties.getAbsoluteExpiryTime().getTime(); + } else if (header != null && header.getTtl() != null) { + expiration = System.currentTimeMillis() + header.getTtl().intValue(); + } + + } finally { TLSEncode.getDecoder().setBuffer(oldBuffer); } @@ -446,10 +455,22 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage @Override public Message copy(final long newID) { + return copy(newID, false); + } + + @Override + public Message copy(final long newID, boolean isDLQOrExpiry) { try { AMQPLargeMessage copy = new AMQPLargeMessage(newID, messageFormat, null, coreMessageObjectPools, storageManager); copy.setDurable(this.isDurable()); - largeBody.copyInto(copy); + + final AtomicInteger place = new AtomicInteger(0); + ByteBuf bufferNewHeader = null; + if (isDLQOrExpiry) { + bufferNewHeader = newHeaderWithoutExpiry(place); + } + + largeBody.copyInto(copy, bufferNewHeader, place.intValue()); copy.finishParse(); copy.releaseResources(true); return copy; @@ -460,7 +481,46 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } } + protected ByteBuf newHeaderWithoutExpiry(AtomicInteger placeOutput) { + ByteBuf bufferNewHeader; + Header headerCopy = null; + if (header != null) { + headerCopy = new Header(header); + headerCopy.setTtl(null); // just in case + } + MessageAnnotations messageAnnotationsRef = this.messageAnnotations; + + Properties propertiesCopy = null; + if (properties != null) { + propertiesCopy = new Properties(properties); + propertiesCopy.setAbsoluteExpiryTime(null); // just in case + } + + if (applicationPropertiesPosition != VALUE_NOT_PRESENT) { + placeOutput.set(applicationPropertiesPosition); + } else { + placeOutput.set(remainingBodyPosition); + } + + if (placeOutput.get() < 0) { + placeOutput.set(0); + bufferNewHeader = null; + } else { + bufferNewHeader = Unpooled.buffer(placeOutput.get()); + } + + if (bufferNewHeader != null) { + TLSEncode.getEncoder().setByteBuffer(new NettyWritable(bufferNewHeader)); + if (headerCopy != null) + TLSEncode.getEncoder().writeObject(headerCopy); + if (messageAnnotationsRef != null) + TLSEncode.getEncoder().writeObject(messageAnnotationsRef); + if (propertiesCopy != null) + TLSEncode.getEncoder().writeObject(propertiesCopy); + } + return bufferNewHeader; + } @Override public void messageChanged() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java index dec2c2a4a6..0d49819115 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeBody.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.persistence.impl.journal; import java.nio.ByteBuffer; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -316,6 +317,10 @@ public class LargeBody { } public void copyInto(LargeServerMessage newMessage) throws Exception { + copyInto(newMessage, null, 0); + } + + public void copyInto(LargeServerMessage newMessage, ByteBuf newHeader, int skipBytes) throws Exception { //clone a SequentialFile to avoid concurrent access SequentialFile cloneFile = getReadingFile(); @@ -328,8 +333,11 @@ public class LargeBody { cloneFile.open(); } - cloneFile.position(0); + cloneFile.position(skipBytes); + if (newHeader != null) { + newMessage.addBytes(new ChannelBufferWrapper(newHeader)); + } for (; ; ) { // The buffer is reused... // We need to make sure we clear the limits and the buffer before reusing it diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index c3247f7a0f..ef0abf2293 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3447,7 +3447,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { long newID = storageManager.generateID(); - Message copy = message.copy(newID); + Message copy = message.copy(newID, true); if (copyOriginalHeaders) { copy.referenceOriginalMessage(message, ref.getQueue().getName().toString()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryLargeMessageTest.java index f35c0f58c6..3d0ac77206 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ExpiryLargeMessageTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.artemis.tests.integration.client; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; import java.io.File; import org.apache.activemq.artemis.api.core.Message; @@ -33,6 +39,7 @@ import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; import org.junit.Test; @@ -233,6 +240,113 @@ public class ExpiryLargeMessageTest extends ActiveMQTestBase { validateNoFilesOnLargeDir(); } + + @Test + public void testExpiryMessagesAMQP() throws Exception { + testExpiryMessagesAMQP(false, 300 * 1024); + } + + @Test + public void testExpiryMessagesAMQPRestartBeforeExpiry() throws Exception { + testExpiryMessagesAMQP(true, 300 * 1024); + } + + // this is just sanity check for the test + @Test + public void testExpiryMessagesAMQPRegularMessageStandardMessage() throws Exception { + testExpiryMessagesAMQP(false, 30); + } + + // this is just sanity check for the test + @Test + public void testExpiryMessagesAMQPRestartBeforeExpiryStandardMessage() throws Exception { + testExpiryMessagesAMQP(true, 30); + } + + public void testExpiryMessagesAMQP(boolean restartBefore, int bodySize) throws Exception { + ActiveMQServer server = createServer(true, true); + + server.getConfiguration().setMessageExpiryScanPeriod(6000); + + AddressSettings setting = new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxDeliveryAttempts(5).setMaxSizeBytes(50 * 1024).setPageSizeBytes(10 * 1024).setExpiryAddress(EXPIRY).setDeadLetterAddress(DLQ); + server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting); + server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting); + + server.start(); + + server.createQueue(new QueueConfiguration(EXPIRY).setRoutingType(RoutingType.ANYCAST)); + + server.createQueue(new QueueConfiguration(DLQ).setRoutingType(RoutingType.ANYCAST)); + + server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST)); + + ConnectionFactory connectionFactory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616"); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + byte[] bufferSample = new byte[bodySize]; + + for (int i = 0; i < bufferSample.length; i++) { + bufferSample[i] = getSamplebyte(i); + } + + javax.jms.Queue jmsQueue = session.createQueue(MY_QUEUE.toString()); + + MessageProducer producer = session.createProducer(jmsQueue); + producer.setTimeToLive(300); + + for (int i = 0; i < numberOfMessages; i++) { + BytesMessage message = session.createBytesMessage(); + message.writeBytes(bufferSample); + + message.setIntProperty("count", i); + + producer.send(message); + } + + session.close(); + connection.close(); + + if (restartBefore) { + server.stop(); + server.start(); + } + + Queue queueExpiry = server.locateQueue(EXPIRY); + Queue myQueue = server.locateQueue(MY_QUEUE); + + Wait.assertEquals(numberOfMessages, () -> { + myQueue.expireReferences(); + return getMessageCount(queueExpiry); + }); + + if (!restartBefore) { + server.stop(); + server.start(); + } + + + // validateNoFilesOnLargeDir(getLargeMessagesDir(), numberOfMessages); + + connection = connectionFactory.createConnection(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + + MessageConsumer cons = session.createConsumer(session.createQueue(EXPIRY.toString())); + connection.start(); + + // Consume half of the messages to make sure all the messages are paging (on the second try) + for (int i = 0; i < numberOfMessages; i++) { + javax.jms.Message msg = cons.receive(5000); + assertNotNull(msg); + msg.acknowledge(); + } + + session.commit(); + + connection.close(); + + } + /** * Tests if the system would still couple with old data where the LargeMessage was linked to its previous copy *