ARTEMIS-2712 Dealing with Aborts AMQP Large Message

This commit is contained in:
Clebert Suconic 2020-04-15 10:32:39 -04:00
parent ceceb6691e
commit 87eebc3d27
3 changed files with 39 additions and 1 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -153,6 +154,10 @@ public class AMQPSessionCallback implements SessionCallback {
} }
public void addCloseable(Closeable closeable) {
serverSession.addCloseable(closeable);
}
public void withinContext(Runnable run) throws Exception { public void withinContext(Runnable run) throws Exception {
OperationContext context = recoverContext(); OperationContext context = recoverContext();
try { try {

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl; import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
@ -140,6 +141,24 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan(); this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors(); useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
this.minLargeMessageSize = connection.getProtocolManager().getAmqpMinLargeMessageSize(); this.minLargeMessageSize = connection.getProtocolManager().getAmqpMinLargeMessageSize();
if (sessionSPI != null) {
sessionSPI.addCloseable((boolean failed) -> clearLargeMessage());
}
}
protected void clearLargeMessage() {
connection.runNow(() -> {
if (currentLargeMessage != null) {
try {
currentLargeMessage.deleteFile();
} catch (Throwable error) {
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
} finally {
currentLargeMessage = null;
}
}
});
} }
@Override @Override
@ -288,6 +307,8 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
try { try {
if (delivery.isAborted()) { if (delivery.isAborted()) {
clearLargeMessage();
// Aborting implicitly remotely settles, so advance // Aborting implicitly remotely settles, so advance
// receiver to the next delivery and settle locally. // receiver to the next delivery and settle locally.
receiver.advance(); receiver.advance();
@ -439,6 +460,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
public void close(ErrorCondition condition) throws ActiveMQAMQPException { public void close(ErrorCondition condition) throws ActiveMQAMQPException {
receiver.setCondition(condition); receiver.setCondition(condition);
close(false); close(false);
clearLargeMessage();
} }
public void flow() { public void flow() {

View File

@ -30,6 +30,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -50,6 +51,7 @@ import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Receiver;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -96,7 +98,14 @@ public class ProtonServerReceiverContextTest {
when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class)); when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver); AtomicInteger clearLargeMessage = new AtomicInteger(0);
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(null, mockConnContext, null, mockReceiver) {
@Override
protected void clearLargeMessage() {
super.clearLargeMessage();
clearLargeMessage.incrementAndGet();
}
};
Delivery mockDelivery = mock(Delivery.class); Delivery mockDelivery = mock(Delivery.class);
when(mockDelivery.isAborted()).thenReturn(true); when(mockDelivery.isAborted()).thenReturn(true);
@ -120,6 +129,8 @@ public class ProtonServerReceiverContextTest {
verify(mockReceiver, times(1)).flow(1); verify(mockReceiver, times(1)).flow(1);
} }
verifyNoMoreInteractions(mockReceiver); verifyNoMoreInteractions(mockReceiver);
Assert.assertTrue(clearLargeMessage.get() > 0);
} }
private void doOnMessageWithDeliveryException(List<Symbol> sourceSymbols, private void doOnMessageWithDeliveryException(List<Symbol> sourceSymbols,