diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index c8f081b30b..c67cac0a1d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -172,6 +172,10 @@ public class AMQPSessionCallback implements SessionCallback { } } + public void execute(Runnable run) { + sessionExecutor.execute(run); + } + public void afterIO(IOCallback ioCallback) { OperationContext context = recoverContext(); try { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index b84823ee2d..e1f6fe192c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -94,6 +94,19 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public void disableAutoRead() { + handler.requireHandler(); + connectionCallback.getTransportConnection().setAutoRead(false); + handler.setReadable(false); + } + + public void enableAutoRead() { + handler.requireHandler(); + connectionCallback.getTransportConnection().setAutoRead(true); + getHandler().setReadable(true); + flush(); + } + public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed"); public static final String AMQP_CONTAINER_ID = "amqp-container-id"; private static final FutureTask VOID_FUTURE = new FutureTask<>(() -> { }, null); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java index 6a44c6339f..63af7b1418 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java @@ -34,7 +34,7 @@ public class AMQPLargeMessageReader implements MessageReader { private final ProtonAbstractReceiver serverReceiver; - private AMQPLargeMessage currentMessage; + private volatile AMQPLargeMessage currentMessage; private DeliveryAnnotations deliveryAnnotations; private boolean closed = true; @@ -50,14 +50,15 @@ public class AMQPLargeMessageReader implements MessageReader { @Override public void close() { if (!closed) { - if (currentMessage != null) { - try { - currentMessage.deleteFile(); - } catch (Throwable error) { - ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); - } finally { - currentMessage = null; + try { + AMQPLargeMessage localCurrentMessage = currentMessage; + if (localCurrentMessage != null) { + localCurrentMessage.deleteFile(); } + } catch (Throwable error) { + ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error); + } finally { + currentMessage = null; } deliveryAnnotations = null; @@ -82,34 +83,53 @@ public class AMQPLargeMessageReader implements MessageReader { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } - final Receiver receiver = ((Receiver) delivery.getLink()); - final ReadableBuffer dataBuffer = receiver.recv(); + try { + serverReceiver.connection.requireInHandler(); + + final Receiver receiver = ((Receiver) delivery.getLink()); + final ReadableBuffer dataBuffer = receiver.recv(); - if (currentMessage == null) { final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); - final long id = sessionSPI.getStorageManager().generateID(); - currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, - sessionSPI.getCoreMessageObjectPools(), - sessionSPI.getStorageManager()); - currentMessage.parseHeader(dataBuffer); - sessionSPI.getStorageManager().onLargeMessageCreate(id, currentMessage); + if (currentMessage == null) { + final long id = sessionSPI.getStorageManager().generateID(); + AMQPLargeMessage localCurrentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); + localCurrentMessage.parseHeader(dataBuffer); + + sessionSPI.getStorageManager().onLargeMessageCreate(id, localCurrentMessage); + currentMessage = localCurrentMessage; + } + + serverReceiver.getConnection().disableAutoRead(); + + boolean partial = delivery.isPartial(); + + sessionSPI.execute(() -> addBytes(delivery, dataBuffer, partial)); + + return null; + } catch (Exception e) { + // if an exception happened we must enable it back + serverReceiver.getConnection().enableAutoRead(); + throw e; } + } - currentMessage.addBytes(dataBuffer); + private void addBytes(Delivery delivery, ReadableBuffer dataBuffer, boolean isPartial) { + final AMQPLargeMessage localCurrentMessage = currentMessage; - final AMQPLargeMessage result; + try { + localCurrentMessage.addBytes(dataBuffer); - if (!delivery.isPartial()) { - currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); - result = currentMessage; - // We don't want a close to delete the file now, we've released the resources. - currentMessage = null; - deliveryAnnotations = result.getDeliveryAnnotations(); - } else { - result = null; + if (!isPartial) { + localCurrentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); + // We don't want a close to delete the file now, we've released the resources. + currentMessage = null; + serverReceiver.connection.runNow(() -> serverReceiver.onMessageComplete(delivery, localCurrentMessage, localCurrentMessage.getDeliveryAnnotations())); + } + } catch (Throwable e) { + serverReceiver.onExceptionWhileReading(e); + } finally { + serverReceiver.connection.runNow(serverReceiver.getConnection()::enableAutoRead); } - - return result; } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java index f6ef4ad20b..d6fccce451 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java @@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter { private MessageReference reference; private AMQPLargeMessage message; + + private LargeBodyReader largeBodyReader; + private Delivery delivery; private long position; private boolean initialPacketHandled; @@ -81,33 +84,59 @@ public class AMQPLargeMessageWriter implements MessageWriter { public void close() { if (!closed) { try { + try { + if (largeBodyReader != null) { + largeBodyReader.close(); + } + } catch (Exception e) { + // if we get an error only at this point, there's nothing else we could do other than log.warn + logger.warn("{}", e.getMessage(), e); + } if (message != null) { message.usageDown(); } } finally { - reset(true); + resetClosed(); } } } @Override - public AMQPLargeMessageWriter open() { + public AMQPLargeMessageWriter open(MessageReference reference) { if (!closed) { throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed"); } - reset(false); + this.reference = reference; + this.message = (AMQPLargeMessage) reference.getMessage(); + this.message.usageUp(); + + try { + largeBodyReader = message.getLargeBodyReader(); + largeBodyReader.open(); + } catch (Exception e) { + serverSender.reportDeliveryError(this, reference, e); + } + + resetOpen(); return this; } - private void reset(boolean closedState) { + private void resetClosed() { message = null; reference = null; delivery = null; + largeBodyReader = null; position = 0; initialPacketHandled = false; - closed = closedState; + closed = true; + } + + private void resetOpen() { + position = 0; + initialPacketHandled = false; + closed = false; } @Override @@ -121,17 +150,15 @@ public class AMQPLargeMessageWriter implements MessageWriter { throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed"); } - this.reference = messageReference; - this.message = (AMQPLargeMessage) messageReference.getMessage(); - if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) { + // an interceptor rejected the delivery + // since we opened the message as part of the queue executor we must close it now + close(); return; } this.delivery = serverSender.createDelivery(messageReference, (int) this.message.getMessageFormat()); - message.usageUp(); - tryDelivering(); } @@ -150,15 +177,14 @@ public class AMQPLargeMessageWriter implements MessageWriter { final ByteBuf frameBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(frameSize, frameSize); final NettyReadable frameView = new NettyReadable(frameBuffer); - try (LargeBodyReader context = message.getLargeBodyReader()) { - context.open(); - context.position(position); - long bodySize = context.getSize(); + try { + largeBodyReader.position(position); + long bodySize = largeBodyReader.getSize(); // materialize it so we can use its internal NIO buffer frameBuffer.ensureWritable(frameSize); if (!initialPacketHandled && protonSender.getLocalState() != EndpointState.CLOSED) { - if (!deliverInitialPacket(context, frameBuffer)) { + if (!deliverInitialPacket(largeBodyReader, frameBuffer)) { return; } @@ -171,7 +197,7 @@ public class AMQPLargeMessageWriter implements MessageWriter { } frameBuffer.clear(); - final int readSize = context.readInto(frameBuffer.internalNioBuffer(0, frameSize)); + final int readSize = largeBodyReader.readInto(frameBuffer.internalNioBuffer(0, frameSize)); frameBuffer.writerIndex(readSize); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java index 94b118db12..04a3ad2f1b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriter.java @@ -129,7 +129,7 @@ public class AMQPTunneledCoreLargeMessageWriter implements MessageWriter { } @Override - public AMQPTunneledCoreLargeMessageWriter open() { + public AMQPTunneledCoreLargeMessageWriter open(MessageReference reference) { if (state != State.CLOSED) { throw new IllegalStateException("Trying to open an AMQP Large Message writer that was not closed"); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java index 8b4db7a04e..020dc6d142 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageReader.java @@ -49,6 +49,9 @@ public interface MessageReader { * and is no longer partial the readBytes method will return the decoded message * for dispatch. * + * Notice that asynchronous Readers will never return the Message but will rather call a complete operation on the + * Server Receiver. + * * @param delivery * The delivery that has pending incoming bytes. */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java index 8afac2c8b3..911ffcc590 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/MessageWriter.java @@ -90,7 +90,7 @@ public interface MessageWriter extends Consumer { * be called on every handler by the sender context as it doesn't know which instances need * opened. */ - default MessageWriter open() { + default MessageWriter open(MessageReference reference) { // Default for stateless handlers is to do nothing here. return this; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java index 62dc963634..9dbfe5406d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.lang.invoke.MethodHandles; + import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.server.RoutingContext; @@ -26,12 +28,17 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPExceptio import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.transaction.TransactionalState; +import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class ProtonAbstractReceiver extends ProtonInitializable implements ProtonDeliveryHandler { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected final AMQPConnectionContext connection; protected final AMQPSessionContext protonSession; @@ -302,8 +309,6 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme public final void onMessage(Delivery delivery) throws ActiveMQAMQPException { connection.requireInHandler(); - final Receiver receiver = ((Receiver) delivery.getLink()); - if (receiver.current() != delivery) { return; } @@ -320,37 +325,61 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme return; } - final Message message = messageReader.readBytes(delivery); - - if (message != null) { - // Fetch this before the close of the reader as that will clear any read message - // delivery annotations. - final DeliveryAnnotations deliveryAnnotations = messageReader.getDeliveryAnnotations(); - - this.messageReader.close(); - this.messageReader = null; - - receiver.advance(); - - Transaction tx = null; - if (delivery.getRemoteState() instanceof TransactionalState) { - TransactionalState txState = (TransactionalState) delivery.getRemoteState(); - tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); - } - - actualDelivery(message, delivery, deliveryAnnotations, receiver, tx); + Message completeMessage; + if ((completeMessage = messageReader.readBytes(delivery)) != null) { + // notice the AMQP Large Message Reader will always return null + // and call the onMessageComplete directly + // since that happens asynchronously + onMessageComplete(delivery, completeMessage, messageReader.getDeliveryAnnotations()); } } catch (Exception e) { + logger.warn(e.getMessage(), e); throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } } + public void onMessageComplete(Delivery delivery, + Message message, DeliveryAnnotations deliveryAnnotations) { + connection.requireInHandler(); + + try { + receiver.advance(); + + Transaction tx = null; + if (delivery.getRemoteState() instanceof TransactionalState) { + TransactionalState txState = (TransactionalState) delivery.getRemoteState(); + try { + tx = this.sessionSPI.getTransaction(txState.getTxnId(), false); + } catch (Exception e) { + this.onExceptionWhileReading(e); + } + } + + actualDelivery(message, delivery, deliveryAnnotations, receiver, tx); + } finally { + // reader is complete, we give it up now + this.messageReader.close(); + this.messageReader = null; + } + } + @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { protonSession.removeReceiver(receiver); closeCurrentReader(); } + public void onExceptionWhileReading(Throwable e) { + logger.warn(e.getMessage(), e); + connection.runNow(() -> { + // setting it enabled just in case a large message reader disabled it + connection.enableAutoRead(); + ErrorCondition ec = new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage()); + connection.close(ec); + connection.flush(); + }); + } + @Override public void close(ErrorCondition condition) throws ActiveMQAMQPException { receiver.setCondition(condition); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 9359f7fe94..ad0d42c20e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -483,7 +483,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr credits--; } - final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open(); + final MessageWriter messageWriter = controller.selectOutgoingMessageWriter(this, messageReference).open(messageReference); // Preserve for hasCredits to check for busy state and possible abort on close this.messageWriter = messageWriter; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 5bd3537ef5..17f17b0c70 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -95,6 +95,8 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { boolean flushInstantly = false; + volatile boolean readable = true; + /** afterFlush and afterFlushSet properties * are set by afterFlush methods. * This is to be called after the flush loop. @@ -108,6 +110,15 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { private Runnable afterFlush; protected Set afterFlushSet; + public boolean isReadable() { + return readable; + } + + public ProtonHandler setReadable(boolean readable) { + this.readable = readable; + return this; + } + @Override public void initialize() throws Exception { initialized = true; @@ -381,11 +392,6 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { flush(); }); - /*try { - Thread.sleep(1000); - } catch (Exception e) { - e.printStackTrace(); - } */ // this needs to be done in two steps // we first flush what we have to the client // after flushed, we close the local connection @@ -562,7 +568,7 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { AuditLogger.setRemoteAddress(h.getRemoteAddress()); } } - while ((ev = collector.peek()) != null) { + while (isReadable() && (ev = collector.peek()) != null) { for (EventHandler h : handlers) { logger.trace("Handling {} towards {}", ev, h); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java index 44e85d358d..c686a59384 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageWriterTest.java @@ -55,6 +55,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.Spy; @@ -150,7 +151,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest { when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); try { writer.writeBytes(reference); @@ -177,7 +178,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest { private void doTestMessageEncodingWrittenToDeliveryWithAnnotations(boolean deliveryAnnotations) throws Exception { AMQPTunneledCoreLargeMessageWriter writer = new AMQPTunneledCoreLargeMessageWriter(serverSender); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); final ByteBuf expectedEncoding = Unpooled.buffer(); @@ -276,7 +277,7 @@ public class AMQPTunneledCoreLargeMessageWriterTest { public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted() throws Exception { AMQPTunneledCoreLargeMessageWriter writer = new AMQPTunneledCoreLargeMessageWriter(serverSender); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); when(protonSender.getLocalState()).thenReturn(EndpointState.ACTIVE); when(protonDelivery.isPartial()).thenReturn(true); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java index 9245fa095b..7a09c0911b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageWriterTest.java @@ -52,6 +52,7 @@ import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.mockito.Spy; @@ -110,7 +111,7 @@ public class AMQPTunneledCoreMessageWriterTest { when(protonSender.getLocalState()).thenReturn(EndpointState.CLOSED); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); try { writer.writeBytes(reference); @@ -171,7 +172,7 @@ public class AMQPTunneledCoreMessageWriterTest { return null; }).when(message).persist(any(ActiveMQBuffer.class)); - writer.open(); + writer.open(Mockito.mock(MessageReference.class)); try { writer.writeBytes(reference);