diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index fe51f15cbb..5db786db55 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -174,6 +174,11 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } } + @Override + public void validateFile() throws ActiveMQException { + largeBody.validateFile(); + } + public void setFileDurable(boolean value) { this.fileDurable = value; } @@ -207,7 +212,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage return parsingData; } - protected void parseHeader(ReadableBuffer buffer) { + public void parseHeader(ReadableBuffer buffer) { DecoderImpl decoder = TLSEncode.getDecoder(); decoder.setBuffer(buffer); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index f9d689ecb0..33cf22e121 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.util.Map; import java.util.concurrent.Executor; +import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -153,6 +154,10 @@ public class AMQPSessionCallback implements SessionCallback { } + public void addCloseable(Closeable closeable) { + serverSession.addCloseable(closeable); + } + public void withinContext(Runnable run) throws Exception { OperationContext context = recoverContext(); try { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 38aaeded9e..62f4e39836 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.SecurityAuth; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; @@ -140,6 +141,24 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan(); useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors(); this.minLargeMessageSize = connection.getProtocolManager().getAmqpMinLargeMessageSize(); + + if (sessionSPI != null) { + sessionSPI.addCloseable((boolean failed) -> clearLargeMessage()); + } + } + + protected void clearLargeMessage() { + connection.runNow(() -> { + if (currentLargeMessage != null) { + try { + currentLargeMessage.deleteFile(); + } catch (Throwable error) { + ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); + } finally { + currentLargeMessage = null; + } + } + }); } @Override @@ -288,6 +307,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements try { if (delivery.isAborted()) { + clearLargeMessage(); + // Aborting implicitly remotely settles, so advance // receiver to the next delivery and settle locally. receiver.advance(); @@ -352,7 +373,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements private void initializeCurrentLargeMessage(Delivery delivery, Receiver receiver) throws Exception { long id = sessionSPI.getStorageManager().generateID(); currentLargeMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); - currentLargeMessage.addBytes(receiver.recv()); + + ReadableBuffer dataBuffer = receiver.recv(); + currentLargeMessage.parseHeader(dataBuffer); + + sessionSPI.getStorageManager().largeMessageCreated(id, currentLargeMessage); + currentLargeMessage.addBytes(dataBuffer); } private void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx) { @@ -439,6 +465,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements public void close(ErrorCondition condition) throws ActiveMQAMQPException { receiver.setCondition(condition); close(false); + clearLargeMessage(); } public void flow() { diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java index ad6bddd485..1071004aea 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -50,6 +51,7 @@ import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; +import org.junit.Assert; import org.junit.Test; import org.mockito.stubbing.Answer; @@ -96,7 +98,14 @@ public class ProtonServerReceiverContextTest { when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class)); - ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver); + AtomicInteger clearLargeMessage = new AtomicInteger(0); + ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver) { + @Override + protected void clearLargeMessage() { + super.clearLargeMessage(); + clearLargeMessage.incrementAndGet(); + } + }; Delivery mockDelivery = mock(Delivery.class); when(mockDelivery.isAborted()).thenReturn(true); @@ -120,6 +129,8 @@ public class ProtonServerReceiverContextTest { verify(mockReceiver, times(1)).flow(1); } verifyNoMoreInteractions(mockReceiver); + + Assert.assertTrue(clearLargeMessage.get() > 0); } private void doOnMessageWithDeliveryException(List sourceSymbols, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java index 5bb91c0115..f131c4146d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java @@ -245,6 +245,8 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent { */ LargeServerMessage createLargeMessage(long id, Message message) throws Exception; + LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception; + enum LargeMessageExtension { DURABLE(".msg"), TEMPORARY(".tmp"), SYNC(".sync"); final String extension; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java index 71a7f452f7..94a2dc0cce 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java @@ -536,13 +536,23 @@ public class JournalStorageManager extends AbstractJournalStorageManager { largeMessage.moveHeadersAndProperties(message); - largeMessage.setMessageID(id); + return largeMessageCreated(id, largeMessage); + } finally { + readUnLock(); + } + } + @Override + public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + largeMessage.setMessageID(id); - // Check durable large massage size before to allocate resources if it can't be stored - if (largeMessage.isDurable()) { - final long maxRecordSize = getMaxRecordSize(); - final int messageEncodeSize = largeMessage.getEncodeSize(); + // Check durable large massage size before to allocate resources if it can't be stored + if (largeMessage.toMessage().isDurable()) { + final long maxRecordSize = getMaxRecordSize(); + if (largeMessage instanceof LargeServerMessageImpl) { + // the following check only applies to Core + LargeServerMessageImpl coreLarge = (LargeServerMessageImpl)largeMessage; + final int messageEncodeSize = coreLarge.getEncodeSize(); if (messageEncodeSize > maxRecordSize) { ActiveMQServerLogger.LOGGER.messageWithHeaderTooLarge(largeMessage.getMessageID(), logger.getName()); @@ -554,22 +564,20 @@ public class JournalStorageManager extends AbstractJournalStorageManager { throw ActiveMQJournalBundle.BUNDLE.recordLargerThanStoreMax(messageEncodeSize, maxRecordSize); } } - - // We do this here to avoid a case where the replication gets a list without this file - // to avoid a race - largeMessage.validateFile(); - - if (largeMessage.isDurable()) { - // We store a marker on the journal that the large file is pending - long pendingRecordID = storePendingLargeMessage(id); - - largeMessage.setPendingRecordID(pendingRecordID); - } - - return largeMessage; - } finally { - readUnLock(); } + + // We do this here to avoid a case where the replication gets a list without this file + // to avoid a race + largeMessage.validateFile(); + + if (largeMessage.toMessage().isDurable()) { + // We store a marker on the journal that the large file is pending + long pendingRecordID = storePendingLargeMessage(id); + + largeMessage.setPendingRecordID(pendingRecordID); + } + + return largeMessage; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java index c8a82be6ba..df5cd678bf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java @@ -309,6 +309,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements CoreLar } } + @Override public synchronized void validateFile() throws ActiveMQException { this.ensureFileExists(true); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java index 8252f34bbc..f7684a82b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.persistence.impl.nullpm; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.io.SequentialFile; @@ -70,6 +71,11 @@ class NullStorageLargeServerMessage extends CoreMessage implements CoreLargeServ } + @Override + public void validateFile() throws ActiveMQException { + + } + @Override public void setStorageManager(StorageManager storageManager) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java index a4a3f8ab98..bc6488f296 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java @@ -303,6 +303,11 @@ public class NullStorageManager implements StorageManager { return largeMessage; } + @Override + public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + return null; + } + @Override public long generateID() { long id = idSequence.getAndIncrement(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 603adaf6f7..795fb92c5f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -609,6 +609,10 @@ public interface ActiveMQServer extends ServiceComponent { Queue locateQueue(SimpleString queueName); + default Queue locateQueue(String queueName) { + return locateQueue(SimpleString.toSimpleString(queueName)); + } + default BindingQueryResult bindingQuery(SimpleString address) throws Exception { return bindingQuery(address, true); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java index 2dcf4042c3..d9eb996f0d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/LargeServerMessage.java @@ -66,5 +66,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage { void setStorageManager(StorageManager storageManager); + void validateFile() throws ActiveMQException; + void finishParse() throws Exception; } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java index f740ba855a..defda9daf8 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java @@ -464,6 +464,11 @@ public class TransactionImplTest extends ActiveMQTestBase { return null; } + @Override + public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + return null; + } + @Override public SequentialFile createFileForLargeMessage(long messageID, LargeMessageExtension extension) { return null; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java new file mode 100644 index 0000000000..af7c5bb57f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/InterruptedAMQPLargeMessage.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.largemessages; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.junit.Assert; +import org.junit.Test; + +public class InterruptedAMQPLargeMessage extends AmqpClientTestSupport { + + private static final int NUMBER_OF_THREADS = 10; + private static final int MINIMAL_SEND = 2; + + private static final int MESSAGE_SIZE = 1024 * 300; + + private static final String smallFrameAcceptor = new String("tcp://localhost:" + (AMQP_PORT + 8)); + + @Override + protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { + server.getConfiguration().addAcceptorConfiguration("flow", smallFrameAcceptor + "?protocols=AMQP;useEpoll=false;maxFrameSize=" + 512 + ";amqpMinLargeMessageSize=" + 10000); + } + + public static void main(String[] arg) { + // have everybody aligned on sending before we start + CyclicBarrier startFlag = new CyclicBarrier(NUMBER_OF_THREADS); + + CountDownLatch minimalKill = new CountDownLatch(MINIMAL_SEND * NUMBER_OF_THREADS); + Runnable runnable = () -> { + + try { + AmqpClient client = createLocalClient(); + AmqpConnection connection = client.createConnection(); + connection.setMaxFrameSize(2 * 1024); + connection.connect(); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(arg[0]); + startFlag.await(); + for (int m = 0; m < 1000; m++) { + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + byte[] bytes = new byte[MESSAGE_SIZE]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) 'z'; + } + + message.setBytes(bytes); + sender.send(message); + minimalKill.countDown(); + } + connection.close(); + } catch (Exception e) { + e.printStackTrace(); + } + }; + + + for (int t = 0; t < NUMBER_OF_THREADS; t++) { + Thread thread = new Thread(runnable); + thread.start(); + } + + try { + minimalKill.await(); + } catch (Exception e) { + e.printStackTrace(); + } + System.exit(-1); + } + + private static AmqpClient createLocalClient() throws URISyntaxException { + return new AmqpClient(new URI(smallFrameAcceptor), null, null); + } + + @Test + public void testInterruptedLargeMessage() throws Exception { + Process p = SpawnedVMSupport.spawnVM(InterruptedAMQPLargeMessage.class.getName(), getQueueName()); + p.waitFor(); + + Queue serverQueue = server.locateQueue(getQueueName()); + + Assert.assertTrue(serverQueue.getMessageCount() >= MINIMAL_SEND * NUMBER_OF_THREADS); + + LinkedListIterator browserIterator = serverQueue.browserIterator(); + + while (browserIterator.hasNext()) { + MessageReference ref = browserIterator.next(); + Message message = ref.getMessage(); + + Assert.assertNotNull(message); + Assert.assertTrue(message instanceof LargeServerMessage); + + Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord()); + } + browserIterator.close(); + + System.out.println("There are " + serverQueue.getMessageCount() + " on the queue"); + int messageCount = (int)serverQueue.getMessageCount(); + + AmqpClient client = createLocalClient(); + AmqpConnection connection = addConnection(client.createConnection()); + connection.setMaxFrameSize(2 * 1024); + connection.connect(); + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(getQueueName()); + + int received = 0; + receiver.flow((int) (messageCount + 10)); + for (int m = 0; m < messageCount; m++) { + receiver.flow(1); + AmqpMessage message = receiver.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(message); + message.accept(true); + received++; + + System.out.println("Received " + received); + Data data = (Data)message.getWrappedMessage().getBody(); + byte[] byteArray = data.getValue().getArray(); + + Assert.assertEquals(MESSAGE_SIZE, byteArray.length); + for (int i = 0; i < byteArray.length; i++) { + Assert.assertEquals((byte)'z', byteArray[i]); + } + } + + + Assert.assertNull(receiver.receiveNoWait()); + + validateNoFilesOnLargeDir(); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java index 81a8a5843b..58dae39cba 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java @@ -30,10 +30,14 @@ import java.util.Arrays; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -322,6 +326,35 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport { session.commit(); + Queue queue = server.locateQueue(SimpleString.toSimpleString(getQueueName())); + + Wait.assertEquals(1, queue::getMessageCount); + + LinkedListIterator browserIterator = queue.browserIterator(); + + while (browserIterator.hasNext()) { + MessageReference ref = browserIterator.next(); + org.apache.activemq.artemis.api.core.Message message = ref.getMessage(); + + Assert.assertNotNull(message); + Assert.assertTrue(message instanceof LargeServerMessage); + + Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord()); + } + browserIterator.close(); + + connection.close(); + + server.stop(); + + server.start(); + + connection = client.createConnection(); + addConnection(connection); + connection.setMaxFrameSize(2 * 1024); + connection.connect(); + session = connection.createSession(); + AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(1); for (int i = 0; i < 1; i++) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java index 4b46cd1e6c..7c3bc3361f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageAvoidLargeMessagesTest.java @@ -44,6 +44,10 @@ public class LargeMessageAvoidLargeMessagesTest extends LargeMessageTest { isCompressedTest = true; } + @Override + protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception { + } + @Override protected boolean isNetty() { return false; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java index bce8375276..a03ca5d246 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageCompressTest.java @@ -55,6 +55,10 @@ public class LargeMessageCompressTest extends LargeMessageTest { isCompressedTest = true; } + @Override + protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception { + } + @Override protected boolean isNetty() { return false; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java index e5cc609920..79c767434f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java @@ -45,6 +45,8 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager; import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.LargeServerMessage; +import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -53,6 +55,7 @@ import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTe import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -247,6 +250,76 @@ public class LargeMessageTest extends LargeMessageTestBase { validateNoFilesOnLargeDir(); } + + @Test + public void testPendingRecord() throws Exception { + + ActiveMQServer server = createServer(true, isNetty(), storeType); + + server.start(); + + final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE); + + ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator)); + + ClientSession session = addClientSession(sf.createSession(false, true, false)); + + session.createQueue(new QueueConfiguration(ADDRESS)); + + ClientProducer producer = session.createProducer(ADDRESS); + + Message clientFile = createLargeClientMessageStreaming(session, messageSize, true); + + // Send large message which should be dropped and deleted from the filesystem + + producer.send(clientFile); + + validateLargeMessageComplete(server); + + sf.close(); + + server.stop(); + + server = createServer(true, isNetty(), storeType); + + server.start(); + + sf = addSessionFactory(createSessionFactory(locator)); + + session = addClientSession(sf.createSession(false, true, false)); + + ClientConsumer consumer = session.createConsumer(ADDRESS); + session.start(); + + ClientMessage message = consumer.receiveImmediate(); + Assert.assertNotNull(message); + for (int i = 0; i < messageSize; i++) { + assertEquals("position = " + i, getSamplebyte(i), message.getBodyBuffer().readByte()); + } + message.acknowledge(); + + validateNoFilesOnLargeDir(); + } + + protected void validateLargeMessageComplete(ActiveMQServer server) throws Exception { + Queue queue = server.locateQueue(ADDRESS); + + Wait.assertEquals(1, queue::getMessageCount); + + LinkedListIterator browserIterator = queue.browserIterator(); + + while (browserIterator.hasNext()) { + MessageReference ref = browserIterator.next(); + Message message = ref.getMessage(); + + Assert.assertNotNull(message); + Assert.assertTrue(message instanceof LargeServerMessage); + + Assert.assertFalse(((LargeServerMessage)message).hasPendingRecord()); + } + browserIterator.close(); + } + @Test public void testDeleteOnDrop() throws Exception { fillAddress(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java index 3d77d1a1dc..3e99da8b49 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java @@ -286,6 +286,11 @@ public class SendAckFailTest extends SpawnedTestBase { manager.start(); } + @Override + public LargeServerMessage largeMessageCreated(long id, LargeServerMessage largeMessage) throws Exception { + return manager.largeMessageCreated(id, largeMessage); + } + @Override public void stop() throws Exception { manager.stop();