diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java index c274301179..11617564e7 100644 --- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java +++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java @@ -1029,6 +1029,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal tx.checkErrorCondition(); } JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); + // we need to calculate the encodeSize here, as it may use caches that are eliminated once the record is written + int encodeSize = addRecord.getEncodeSize(); JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); if (logger.isTraceEnabled()) { @@ -1042,7 +1044,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal usedFile); } - tx.addPositive(usedFile, id, addRecord.getEncodeSize()); + tx.addPositive(usedFile, id, encodeSize); } catch (Throwable e) { logger.error("appendAddRecordTransactional:" + e, e); setErrorCondition(null, tx, e); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 7937f7fc36..f310fb3108 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.ByteBuffer; import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ICoreMessage; @@ -79,7 +80,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage /** * AMQPLargeMessagePersister will save the buffer here. * */ - volatile ByteBuf temporaryBuffer; + private ByteBuf temporaryBuffer; private final LargeBody largeBody; /** @@ -126,6 +127,41 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage parsingData = null; } + public void releaseEncodedBuffer() { + internalReleaseBuffer(1); + } + + /** {@link #getSavedEncodeBuffer()} will retain two counters from the buffer, one meant for the call, + * and one that must be released only after encoding. + * + * This method is meant to be called when the buffer is actually encoded on the journal, meaning both refs are gone. + * and the actual buffer can be released. + */ + public void releaseEncodedBufferAfterWrite() { + internalReleaseBuffer(2); + } + + private synchronized void internalReleaseBuffer(int releases) { + for (int i = 0; i < releases; i++) { + if (temporaryBuffer != null && temporaryBuffer.release()) { + temporaryBuffer = null; + } + } + } + + /** This is used on test assertions to make sure the buffers are released corrected */ + public ByteBuf inspectTemporaryBuffer() { + return temporaryBuffer; + } + + public synchronized ByteBuf getSavedEncodeBuffer() { + if (temporaryBuffer == null) { + temporaryBuffer = PooledByteBufAllocator.DEFAULT.buffer(getEstimateSavedEncode()); + saveEncoding(temporaryBuffer); + } + return temporaryBuffer.retain(1); + } + @Override public void finishParse() throws Exception { openLargeMessage(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java index cffda5de23..f573e6b694 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessagePersister.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; @@ -57,22 +56,18 @@ public class AMQPLargeMessagePersister extends MessagePersister { @Override public int getEncodeSize(Message record) { - ByteBuf buf = getSavedEncodeBuffer(record); + AMQPLargeMessage msgEncode = (AMQPLargeMessage) record; + ByteBuf buf = msgEncode.getSavedEncodeBuffer(); - int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex(); + try { + int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex(); - TypedProperties properties = ((AMQPMessage) record).getExtraProperties(); + TypedProperties properties = ((AMQPMessage) record).getExtraProperties(); - return encodeSize + (properties != null ? properties.getEncodeSize() : 0); - } - - private ByteBuf getSavedEncodeBuffer(Message record) { - AMQPLargeMessage largeMessage = (AMQPLargeMessage)record; - if (largeMessage.temporaryBuffer == null) { - largeMessage.temporaryBuffer = PooledByteBufAllocator.DEFAULT.buffer(largeMessage.getEstimateSavedEncode()); - largeMessage.saveEncoding(largeMessage.temporaryBuffer); + return encodeSize + (properties != null ? properties.getEncodeSize() : 0); + } finally { + msgEncode.releaseEncodedBuffer(); } - return largeMessage.temporaryBuffer; } /** @@ -96,10 +91,10 @@ public class AMQPLargeMessagePersister extends MessagePersister { properties.encode(buffer.byteBuf()); } - ByteBuf savedEncodeBuffer = getSavedEncodeBuffer(record); + ByteBuf savedEncodeBuffer = msgEncode.getSavedEncodeBuffer(); buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex()); - savedEncodeBuffer.release(); - msgEncode.temporaryBuffer = null; + msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as getSavedEncodedBuffer will keep 1 for himself until encoding has happened + // which this is the expected event where we need to release the extra refCounter } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java index 1ba439ce05..328f7f9de2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpBridgeClusterRedistributionTest.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.amqp.largemessages.AMQPLargeMessagesTestUtil; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.junit.After; import org.junit.Before; @@ -180,12 +181,18 @@ public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport { sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true); + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0); + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1); + ClientMessage message = consumer.receive(5000); assertNotNull(message); message = consumer.receiveImmediate(); assertNull(message); } + + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0); + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1); } @Test @@ -196,12 +203,18 @@ public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport { sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true); + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0); + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1); + ClientMessage message = consumer.receive(5000); assertNotNull(message); message = consumer.receiveImmediate(); assertNull(message); } + + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0); + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1); } protected void setupClusterConnection(final String name, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java new file mode 100644 index 0000000000..769172f399 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessagesTestUtil.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.amqp.largemessages; + +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.collections.LinkedListIterator; +import org.junit.Assert; + +public class AMQPLargeMessagesTestUtil { + + + public static void validateAllTemporaryBuffers(ActiveMQServer server) { + for (Binding binding : server.getPostOffice().getAllBindings().values()) { + if (binding instanceof QueueBinding) { + validateTemporaryBuffers(((QueueBinding)binding).getQueue()); + } + } + } + + public static void validateTemporaryBuffers(Queue serverQueue) { + LinkedListIterator totalIterator = serverQueue.browserIterator(); + while (totalIterator.hasNext()) { + MessageReference ref = totalIterator.next(); + if (ref.getMessage() instanceof AMQPLargeMessage) { + AMQPLargeMessage amqpLargeMessage = (AMQPLargeMessage) ref.getMessage(); + // Using a Wait.waitFor here as we may have something working with the buffer in parallel + Wait.waitFor(() -> amqpLargeMessage.inspectTemporaryBuffer() == null, 1000, 10); + Assert.assertNull("Temporary buffers are being retained", amqpLargeMessage.inspectTemporaryBuffer()); + } + } + totalIterator.close(); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java index 5fc4e60c56..a87a82e35a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AmqpReplicatedLargeMessageTest.java @@ -98,7 +98,7 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport { assertEquals(0, queueView.getMessageCount()); session.begin(); - for (int m = 0; m < 10; m++) { + for (int m = 0; m < 100; m++) { AmqpMessage message = new AmqpMessage(); message.setDurable(true); message.setApplicationProperty("i", "m " + m); @@ -112,6 +112,8 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport { } session.commit(); + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server); + if (crashServer) { connection.close(); liveServer.crash(); @@ -129,11 +131,11 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport { } queueView = server.locateQueue(getQueueName()); - Wait.assertEquals(10, queueView::getMessageCount); + Wait.assertEquals(100, queueView::getMessageCount); AmqpReceiver receiver = session.createReceiver(getQueueName().toString()); - receiver.flow(10); - for (int i = 0; i < 10; i++) { + receiver.flow(100); + for (int i = 0; i < 100; i++) { AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS); Assert.assertNotNull(msgReceived); Data body = (Data)msgReceived.getWrappedMessage().getBody(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java index 2539838ec2..81a8a5843b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java @@ -125,6 +125,8 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport { } session.commit(); + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server); + if (restartServer) { connection.close(); server.stop(); @@ -230,6 +232,8 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport { } } + AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server); + if (restartServer) { connection.close(); server.stop(); @@ -274,6 +278,79 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport { } + + @Test + public void testSingleMessage() throws Exception { + try { + + int size = 100 * 1024; + AmqpClient client = createAmqpClient(new URI(smallFrameAcceptor)); + AmqpConnection connection = client.createConnection(); + addConnection(connection); + connection.setMaxFrameSize(2 * 1024); + connection.connect(); + + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getQueueName()); + + Queue queueView = getProxyToQueue(getQueueName()); + assertNotNull(queueView); + assertEquals(0, queueView.getMessageCount()); + + session.begin(); + int oddID = 0; + for (int m = 0; m < 1; m++) { + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + boolean odd = (m % 2 == 0); + message.setApplicationProperty("i", m); + message.setApplicationProperty("oddString", odd ? "odd" : "even"); + message.setApplicationProperty("odd", odd); + if (odd) { + message.setApplicationProperty("oddID", oddID++); + } + + byte[] bytes = new byte[size]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) 'z'; + } + + message.setBytes(bytes); + sender.send(message); + } + + session.commit(); + + AmqpReceiver receiver = session.createReceiver(getQueueName()); + receiver.flow(1); + for (int i = 0; i < 1; i++) { + AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS); + Assert.assertNotNull(msgReceived); + Assert.assertTrue((boolean)msgReceived.getApplicationProperty("odd")); + Assert.assertEquals(i, (int)msgReceived.getApplicationProperty("oddID")); + Data body = (Data) msgReceived.getWrappedMessage().getBody(); + byte[] bodyArray = body.getValue().getArray(); + for (int bI = 0; bI < size; bI++) { + Assert.assertEquals((byte) 'z', bodyArray[bI]); + } + msgReceived.accept(true); + } + + receiver.flow(1); + Assert.assertNull(receiver.receiveNoWait()); + + receiver.close(); + connection.close(); + + validateNoFilesOnLargeDir(getLargeMessagesDir(), 0); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + + } + @Test public void testJMSPersistentTX() throws Exception {