ARTEMIS-1975 Fixing LargeMessage encoding for replication

This commit is contained in:
Clebert Suconic 2020-03-24 22:57:24 -04:00
parent b9fa8b96ec
commit 03fb630f73
7 changed files with 202 additions and 22 deletions

View File

@ -1029,6 +1029,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.checkErrorCondition(); tx.checkErrorCondition();
} }
JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record); JournalInternalRecord addRecord = new JournalAddRecordTX(true, txID, id, recordType, persister, record);
// we need to calculate the encodeSize here, as it may use caches that are eliminated once the record is written
int encodeSize = addRecord.getEncodeSize();
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null); JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
@ -1042,7 +1044,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
usedFile); usedFile);
} }
tx.addPositive(usedFile, id, addRecord.getEncodeSize()); tx.addPositive(usedFile, id, encodeSize);
} catch (Throwable e) { } catch (Throwable e) {
logger.error("appendAddRecordTransactional:" + e, e); logger.error("appendAddRecordTransactional:" + e, e);
setErrorCondition(null, tx, e); setErrorCondition(null, tx, e);

View File

@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -79,7 +80,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
/** /**
* AMQPLargeMessagePersister will save the buffer here. * AMQPLargeMessagePersister will save the buffer here.
* */ * */
volatile ByteBuf temporaryBuffer; private ByteBuf temporaryBuffer;
private final LargeBody largeBody; private final LargeBody largeBody;
/** /**
@ -126,6 +127,41 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
parsingData = null; parsingData = null;
} }
public void releaseEncodedBuffer() {
internalReleaseBuffer(1);
}
/** {@link #getSavedEncodeBuffer()} will retain two counters from the buffer, one meant for the call,
* and one that must be released only after encoding.
*
* This method is meant to be called when the buffer is actually encoded on the journal, meaning both refs are gone.
* and the actual buffer can be released.
*/
public void releaseEncodedBufferAfterWrite() {
internalReleaseBuffer(2);
}
private synchronized void internalReleaseBuffer(int releases) {
for (int i = 0; i < releases; i++) {
if (temporaryBuffer != null && temporaryBuffer.release()) {
temporaryBuffer = null;
}
}
}
/** This is used on test assertions to make sure the buffers are released corrected */
public ByteBuf inspectTemporaryBuffer() {
return temporaryBuffer;
}
public synchronized ByteBuf getSavedEncodeBuffer() {
if (temporaryBuffer == null) {
temporaryBuffer = PooledByteBufAllocator.DEFAULT.buffer(getEstimateSavedEncode());
saveEncoding(temporaryBuffer);
}
return temporaryBuffer.retain(1);
}
@Override @Override
public void finishParse() throws Exception { public void finishParse() throws Exception {
openLargeMessage(); openLargeMessage();

View File

@ -18,7 +18,6 @@
package org.apache.activemq.artemis.protocol.amqp.broker; package org.apache.activemq.artemis.protocol.amqp.broker;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -57,22 +56,18 @@ public class AMQPLargeMessagePersister extends MessagePersister {
@Override @Override
public int getEncodeSize(Message record) { public int getEncodeSize(Message record) {
ByteBuf buf = getSavedEncodeBuffer(record); AMQPLargeMessage msgEncode = (AMQPLargeMessage) record;
ByteBuf buf = msgEncode.getSavedEncodeBuffer();
try {
int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex(); int encodeSize = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofNullableString(record.getAddressSimpleString()) + DataConstants.SIZE_BOOLEAN + buf.writerIndex();
TypedProperties properties = ((AMQPMessage) record).getExtraProperties(); TypedProperties properties = ((AMQPMessage) record).getExtraProperties();
return encodeSize + (properties != null ? properties.getEncodeSize() : 0); return encodeSize + (properties != null ? properties.getEncodeSize() : 0);
} finally {
msgEncode.releaseEncodedBuffer();
} }
private ByteBuf getSavedEncodeBuffer(Message record) {
AMQPLargeMessage largeMessage = (AMQPLargeMessage)record;
if (largeMessage.temporaryBuffer == null) {
largeMessage.temporaryBuffer = PooledByteBufAllocator.DEFAULT.buffer(largeMessage.getEstimateSavedEncode());
largeMessage.saveEncoding(largeMessage.temporaryBuffer);
}
return largeMessage.temporaryBuffer;
} }
/** /**
@ -96,10 +91,10 @@ public class AMQPLargeMessagePersister extends MessagePersister {
properties.encode(buffer.byteBuf()); properties.encode(buffer.byteBuf());
} }
ByteBuf savedEncodeBuffer = getSavedEncodeBuffer(record); ByteBuf savedEncodeBuffer = msgEncode.getSavedEncodeBuffer();
buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex()); buffer.writeBytes(savedEncodeBuffer, 0, savedEncodeBuffer.writerIndex());
savedEncodeBuffer.release(); msgEncode.releaseEncodedBufferAfterWrite(); // we need two releases, as getSavedEncodedBuffer will keep 1 for himself until encoding has happened
msgEncode.temporaryBuffer = null; // which this is the expected event where we need to release the extra refCounter
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.amqp.largemessages.AMQPLargeMessagesTestUtil;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -180,12 +181,18 @@ public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport {
sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true); sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true);
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0);
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1);
ClientMessage message = consumer.receive(5000); ClientMessage message = consumer.receive(5000);
assertNotNull(message); assertNotNull(message);
message = consumer.receiveImmediate(); message = consumer.receiveImmediate();
assertNull(message); assertNull(message);
} }
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0);
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1);
} }
@Test @Test
@ -196,12 +203,18 @@ public class AmqpBridgeClusterRedistributionTest extends AmqpClientTestSupport {
sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true); sendMessages("uswest.Provider.AMC.Agent.DIVERTED.CustomNotification", 1, RoutingType.ANYCAST, true);
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0);
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1);
ClientMessage message = consumer.receive(5000); ClientMessage message = consumer.receive(5000);
assertNotNull(message); assertNotNull(message);
message = consumer.receiveImmediate(); message = consumer.receiveImmediate();
assertNull(message); assertNull(message);
} }
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server0);
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server1);
} }
protected void setupClusterConnection(final String name, protected void setupClusterConnection(final String name,

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.largemessages;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
public class AMQPLargeMessagesTestUtil {
public static void validateAllTemporaryBuffers(ActiveMQServer server) {
for (Binding binding : server.getPostOffice().getAllBindings().values()) {
if (binding instanceof QueueBinding) {
validateTemporaryBuffers(((QueueBinding)binding).getQueue());
}
}
}
public static void validateTemporaryBuffers(Queue serverQueue) {
LinkedListIterator<MessageReference> totalIterator = serverQueue.browserIterator();
while (totalIterator.hasNext()) {
MessageReference ref = totalIterator.next();
if (ref.getMessage() instanceof AMQPLargeMessage) {
AMQPLargeMessage amqpLargeMessage = (AMQPLargeMessage) ref.getMessage();
// Using a Wait.waitFor here as we may have something working with the buffer in parallel
Wait.waitFor(() -> amqpLargeMessage.inspectTemporaryBuffer() == null, 1000, 10);
Assert.assertNull("Temporary buffers are being retained", amqpLargeMessage.inspectTemporaryBuffer());
}
}
totalIterator.close();
}
}

View File

@ -98,7 +98,7 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport {
assertEquals(0, queueView.getMessageCount()); assertEquals(0, queueView.getMessageCount());
session.begin(); session.begin();
for (int m = 0; m < 10; m++) { for (int m = 0; m < 100; m++) {
AmqpMessage message = new AmqpMessage(); AmqpMessage message = new AmqpMessage();
message.setDurable(true); message.setDurable(true);
message.setApplicationProperty("i", "m " + m); message.setApplicationProperty("i", "m " + m);
@ -112,6 +112,8 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport {
} }
session.commit(); session.commit();
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
if (crashServer) { if (crashServer) {
connection.close(); connection.close();
liveServer.crash(); liveServer.crash();
@ -129,11 +131,11 @@ public class AmqpReplicatedLargeMessageTest extends AmqpReplicatedTestSupport {
} }
queueView = server.locateQueue(getQueueName()); queueView = server.locateQueue(getQueueName());
Wait.assertEquals(10, queueView::getMessageCount); Wait.assertEquals(100, queueView::getMessageCount);
AmqpReceiver receiver = session.createReceiver(getQueueName().toString()); AmqpReceiver receiver = session.createReceiver(getQueueName().toString());
receiver.flow(10); receiver.flow(100);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 100; i++) {
AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS); AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(msgReceived); Assert.assertNotNull(msgReceived);
Data body = (Data)msgReceived.getWrappedMessage().getBody(); Data body = (Data)msgReceived.getWrappedMessage().getBody();

View File

@ -125,6 +125,8 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
} }
session.commit(); session.commit();
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
if (restartServer) { if (restartServer) {
connection.close(); connection.close();
server.stop(); server.stop();
@ -230,6 +232,8 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
} }
} }
AMQPLargeMessagesTestUtil.validateAllTemporaryBuffers(server);
if (restartServer) { if (restartServer) {
connection.close(); connection.close();
server.stop(); server.stop();
@ -274,6 +278,79 @@ public class SimpleStreamingLargeMessageTest extends AmqpClientTestSupport {
} }
@Test
public void testSingleMessage() throws Exception {
try {
int size = 100 * 1024;
AmqpClient client = createAmqpClient(new URI(smallFrameAcceptor));
AmqpConnection connection = client.createConnection();
addConnection(connection);
connection.setMaxFrameSize(2 * 1024);
connection.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName());
Queue queueView = getProxyToQueue(getQueueName());
assertNotNull(queueView);
assertEquals(0, queueView.getMessageCount());
session.begin();
int oddID = 0;
for (int m = 0; m < 1; m++) {
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
boolean odd = (m % 2 == 0);
message.setApplicationProperty("i", m);
message.setApplicationProperty("oddString", odd ? "odd" : "even");
message.setApplicationProperty("odd", odd);
if (odd) {
message.setApplicationProperty("oddID", oddID++);
}
byte[] bytes = new byte[size];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) 'z';
}
message.setBytes(bytes);
sender.send(message);
}
session.commit();
AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(1);
for (int i = 0; i < 1; i++) {
AmqpMessage msgReceived = receiver.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(msgReceived);
Assert.assertTrue((boolean)msgReceived.getApplicationProperty("odd"));
Assert.assertEquals(i, (int)msgReceived.getApplicationProperty("oddID"));
Data body = (Data) msgReceived.getWrappedMessage().getBody();
byte[] bodyArray = body.getValue().getArray();
for (int bI = 0; bI < size; bI++) {
Assert.assertEquals((byte) 'z', bodyArray[bI]);
}
msgReceived.accept(true);
}
receiver.flow(1);
Assert.assertNull(receiver.receiveNoWait());
receiver.close();
connection.close();
validateNoFilesOnLargeDir(getLargeMessagesDir(), 0);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
@Test @Test
public void testJMSPersistentTX() throws Exception { public void testJMSPersistentTX() throws Exception {