ARTEMIS-4668 Moving AMQP Large Message file handling away from Netty thread

This commit is contained in:
Clebert Suconic 2024-03-05 16:17:50 -05:00 committed by clebertsuconic
parent 5ce70f9e37
commit c83ed8957d
12 changed files with 183 additions and 80 deletions

View File

@ -172,6 +172,10 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
public void execute(Runnable run) {
sessionExecutor.execute(run);
}
public void afterIO(IOCallback ioCallback) {
OperationContext context = recoverContext();
try {

View File

@ -94,6 +94,19 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public void disableAutoRead() {
handler.requireHandler();
connectionCallback.getTransportConnection().setAutoRead(false);
handler.setReadable(false);
}
public void enableAutoRead() {
handler.requireHandler();
connectionCallback.getTransportConnection().setAutoRead(true);
getHandler().setReadable(true);
flush();
}
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
public static final String AMQP_CONTAINER_ID = "amqp-container-id";
private static final FutureTask<Void> VOID_FUTURE = new FutureTask<>(() -> { }, null);

View File

@ -34,7 +34,7 @@ public class AMQPLargeMessageReader implements MessageReader {
private final ProtonAbstractReceiver serverReceiver;
private AMQPLargeMessage currentMessage;
private volatile AMQPLargeMessage currentMessage;
private DeliveryAnnotations deliveryAnnotations;
private boolean closed = true;
@ -50,14 +50,15 @@ public class AMQPLargeMessageReader implements MessageReader {
@Override
public void close() {
if (!closed) {
if (currentMessage != null) {
try {
currentMessage.deleteFile();
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
currentMessage = null;
try {
AMQPLargeMessage localCurrentMessage = currentMessage;
if (localCurrentMessage != null) {
localCurrentMessage.deleteFile();
}
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
currentMessage = null;
}
deliveryAnnotations = null;
@ -82,34 +83,53 @@ public class AMQPLargeMessageReader implements MessageReader {
throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed");
}
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
try {
serverReceiver.connection.requireInHandler();
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
if (currentMessage == null) {
final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null,
sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);
sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage);
if (currentMessage == null) {
final long id = sessionSPI.getStorageManager().generateID();
AMQPLargeMessage localCurrentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager());
localCurrentMessage.parseHeader(dataBuffer);
sessionSPI.getStorageManager().onLargeMessageCreate(id, localCurrentMessage);
currentMessage = localCurrentMessage;
}
serverReceiver.getConnection().disableAutoRead();
boolean partial = delivery.isPartial();
sessionSPI.execute(() -> addBytes(delivery, dataBuffer, partial));
return null;
} catch (Exception e) {
// if an exception happened we must enable it back
serverReceiver.getConnection().enableAutoRead();
throw e;
}
}
currentMessage.addBytes(dataBuffer);
private void addBytes(Delivery delivery, ReadableBuffer dataBuffer, boolean isPartial) {
final AMQPLargeMessage localCurrentMessage = currentMessage;
final AMQPLargeMessage result;
try {
localCurrentMessage.addBytes(dataBuffer);
if (!delivery.isPartial()) {
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
result = currentMessage;
// We don't want a close to delete the file now, we've released the resources.
currentMessage = null;
deliveryAnnotations = result.getDeliveryAnnotations();
} else {
result = null;
if (!isPartial) {
localCurrentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true);
// We don't want a close to delete the file now, we've released the resources.
currentMessage = null;
serverReceiver.connection.runNow(() -> serverReceiver.onMessageComplete(delivery, localCurrentMessage, localCurrentMessage.getDeliveryAnnotations()));
}
} catch (Throwable e) {
serverReceiver.onExceptionWhileReading(e);
} finally {
serverReceiver.connection.runNow(serverReceiver.getConnection()::enableAutoRead);
}
return result;
}
}

View File

@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter {
private MessageReference reference;
private AMQPLargeMessage message;
private LargeBodyReader largeBodyReader;
private Delivery delivery;
private long position;
private boolean initialPacketHandled;
@ -81,33 +84,59 @@ public class AMQPLargeMessageWriter implements MessageWriter {
public void close() {
if (!closed) {
try {
try {
if (largeBodyReader != null) {
largeBodyReader.close();
}
} catch (Exception e) {
// if we get an error only at this point, there's nothing else we could do other than log.warn
logger.warn("{}", e.getMessage(), e);
}
if (message != null) {
message.usageDown();
}
} finally {
reset(true);
resetClosed();
}
}
}
@Override
public AMQPLargeMessageWriter open() {
public AMQPLargeMessageWriter open(MessageReference reference) {
if (!closed) {
throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
}
reset(false);
this.reference = reference;
this.message = (AMQPLargeMessage) reference.getMessage();
this.message.usageUp();
try {
largeBodyReader = message.getLargeBodyReader();
largeBodyReader.open();
} catch (Exception e) {
serverSender.reportDeliveryError(this, reference, e);
}
resetOpen();
return this;
}
private void reset(boolean closedState) {
private void resetClosed() {
message = null;
reference = null;
delivery = null;
largeBodyReader = null;
position = 0;
initialPacketHandled = false;
closed = closedState;
closed = true;
}
private void resetOpen() {
position = 0;
initialPacketHandled = false;
closed = false;
}
@Override
@ -121,17 +150,15 @@ public class AMQPLargeMessageWriter implements MessageWriter {
throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed");
}
this.reference = messageReference;
this.message = (AMQPLargeMessage) messageReference.getMessage();
if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
// an interceptor rejected the delivery
// since we opened the message as part of the queue executor we must close it now
close();
return;
}
this.delivery = serverSender.createDelivery(messageReference, (int) this.message.getMessageFormat());
message.usageUp();
tryDelivering();
}
@ -150,15 +177,14 @@ public class AMQPLargeMessageWriter implements MessageWriter {
final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize);
final NettyReadable frameView = new NettyReadable(frameBuffer);
try (LargeBodyReader context = message.getLargeBodyReader()) {
context.open();
context.position(position);
long bodySize = context.getSize();
try {
largeBodyReader.position(position);
long bodySize = largeBodyReader.getSize();
// materialize it so we can use its internal NIO buffer
frameBuffer.ensureWritable(frameSize);
if (!initialPacketHandled && protonSender.getLocalState() != EndpointState.CLOSED) {
if (!deliverInitialPacket(context, frameBuffer)) {
if (!deliverInitialPacket(largeBodyReader, frameBuffer)) {
return;
}
@ -171,7 +197,7 @@ public class AMQPLargeMessageWriter implements MessageWriter {
}
frameBuffer.clear();
final int readSize = context.readInto(frameBuffer.internalNioBuffer(0, frameSize));
final int readSize = largeBodyReader.readInto(frameBuffer.internalNioBuffer(0, frameSize));
frameBuffer.writerIndex(readSize);

View File

@ -129,7 +129,7 @@ public class AMQPTunneledCoreLargeMessageWriter implements MessageWriter {
}
@Override
public AMQPTunneledCoreLargeMessageWriter open() {
public AMQPTunneledCoreLargeMessageWriter open(MessageReference reference) {
if (state != State.CLOSED) {
throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed");
}

View File

@ -49,6 +49,9 @@ public interface MessageReader {
* and is no longer partial the readBytes method will return the decoded message
* for dispatch.
*
* Notice that asynchronous Readers will never return the Message but will rather call a complete operation on the
* Server Receiver.
*
* @param delivery
* The delivery that has pending incoming bytes.
*/

View File

@ -90,7 +90,7 @@ public interface MessageWriter extends Consumer<MessageReference> {
* be called on every handler by the sender context as it doesn't know which instances need
* opened.
*/
default MessageWriter open() {
default MessageWriter open(MessageReference reference) {
// Default for stateless handlers is to do nothing here.
return this;
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.lang.invoke.MethodHandles;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.RoutingContext;
@ -26,12 +28,17 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ProtonAbstractReceiver extends ProtonInitializable implements ProtonDeliveryHandler {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final AMQPConnectionContext connection;
protected final AMQPSessionContext protonSession;
@ -302,8 +309,6 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
public final void onMessage(Delivery delivery) throws ActiveMQAMQPException {
connection.requireInHandler();
final Receiver receiver = ((Receiver) delivery.getLink());
if (receiver.current() != delivery) {
return;
}
@ -320,37 +325,61 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
return;
}
final Message message = messageReader.readBytes(delivery);
if (message != null) {
// Fetch this before the close of the reader as that will clear any read message
// delivery annotations.
final DeliveryAnnotations deliveryAnnotations = messageReader.getDeliveryAnnotations();
this.messageReader.close();
this.messageReader = null;
receiver.advance();
Transaction tx = null;
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
}
actualDelivery(message, delivery, deliveryAnnotations, receiver, tx);
Message completeMessage;
if ((completeMessage = messageReader.readBytes(delivery)) != null) {
// notice the AMQP Large Message Reader will always return null
// and call the onMessageComplete directly
// since that happens asynchronously
onMessageComplete(delivery, completeMessage, messageReader.getDeliveryAnnotations());
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
}
public void onMessageComplete(Delivery delivery,
Message message, DeliveryAnnotations deliveryAnnotations) {
connection.requireInHandler();
try {
receiver.advance();
Transaction tx = null;
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txState = (TransactionalState) delivery.getRemoteState();
try {
tx = this.sessionSPI.getTransaction(txState.getTxnId(), false);
} catch (Exception e) {
this.onExceptionWhileReading(e);
}
}
actualDelivery(message, delivery, deliveryAnnotations, receiver, tx);
} finally {
// reader is complete, we give it up now
this.messageReader.close();
this.messageReader = null;
}
}
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
protonSession.removeReceiver(receiver);
closeCurrentReader();
}
public void onExceptionWhileReading(Throwable e) {
logger.warn(e.getMessage(), e);
connection.runNow(() -> {
// setting it enabled just in case a large message reader disabled it
connection.enableAutoRead();
ErrorCondition ec = new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage());
connection.close(ec);
connection.flush();
});
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
receiver.setCondition(condition);

View File

@ -483,7 +483,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
credits--;
}
final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open();
final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open(messageReference);
// Preserve for hasCredits to check for busy state and possible abort on close
this.messageWriter = messageWriter;

View File

@ -95,6 +95,8 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
boolean flushInstantly = false;
volatile boolean readable = true;
/** afterFlush and afterFlushSet properties
* are set by afterFlush methods.
* This is to be called after the flush loop.
@ -108,6 +110,15 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
private Runnable afterFlush;
protected Set<Runnable> afterFlushSet;
public boolean isReadable() {
return readable;
}
public ProtonHandler setReadable(boolean readable) {
this.readable = readable;
return this;
}
@Override
public void initialize() throws Exception {
initialized = true;
@ -381,11 +392,6 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
flush();
});
/*try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
} */
// this needs to be done in two steps
// we first flush what we have to the client
// after flushed, we close the local connection
@ -562,7 +568,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
AuditLogger.setRemoteAddress(h.getRemoteAddress());
}
}
while ((ev = collector.peek()) != null) {
while (isReadable() && (ev = collector.peek()) != null) {
for (EventHandler h : handlers) {
logger.trace("Handling {} towards {}", ev, h);

View File

@ -55,6 +55,7 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
@ -150,7 +151,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED);
writer.open();
writer.open(Mockito.mock(MessageReference.class));
try {
writer.writeBytes(reference);
@ -177,7 +178,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
private void doTestMessageEncodingWrittenToDeliveryWithAnnotations(boolean deliveryAnnotations) throws Exception {
AMQPTunneledCoreLargeMessageWriter writer = new AMQPTunneledCoreLargeMessageWriter(serverSender);
writer.open();
writer.open(Mockito.mock(MessageReference.class));
final ByteBuf expectedEncoding = Unpooled.buffer();
@ -276,7 +277,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest {
public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted() throws Exception {
AMQPTunneledCoreLargeMessageWriter writer = new AMQPTunneledCoreLargeMessageWriter(serverSender);
writer.open();
writer.open(Mockito.mock(MessageReference.class));
when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE);
when(protonDelivery.isPartial()).thenReturn(true);

View File

@ -52,6 +52,7 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
@ -110,7 +111,7 @@ public class AMQPTunneledCoreMessageWriterTest {
when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED);
writer.open();
writer.open(Mockito.mock(MessageReference.class));
try {
writer.writeBytes(reference);
@ -171,7 +172,7 @@ public class AMQPTunneledCoreMessageWriterTest {
return null;
}).when(message).persist(any(ActiveMQBuffer.class));
writer.open();
writer.open(Mockito.mock(MessageReference.class));
try {
writer.writeBytes(reference);