From c2da0fd663c286f275ff090a1f87c16f70cb5653 Mon Sep 17 00:00:00 2001 From: gtully Date: Wed, 5 May 2021 12:39:51 +0100 Subject: [PATCH] ARTEMIS-3200 - remove braces from the belt and braces fix in ARTEMIS-2712, the braces are not necessary and leak, cleaning up in close negates the need to the session closeable --- .../amqp/broker/AMQPSessionCallback.java | 11 +++-- .../amqp/proton/ProtonAbstractReceiver.java | 13 +++--- .../amqp/broker/AMQPSessionCallbackTest.java | 17 +++++++ .../ProtonServerReceiverContextTest.java | 44 +++++++++++++++++++ .../core/server/impl/ServerSessionImpl.java | 5 +++ .../integration/amqp/AmqpSessionTest.java | 28 ++++++++++++ 6 files changed, 106 insertions(+), 12 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 82dbec7348..2f5fa016f8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -19,7 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.util.Map; import java.util.concurrent.Executor; -import org.apache.activemq.artemis.Closeable; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -160,9 +159,6 @@ public class AMQPSessionCallback implements SessionCallback { } - public void addCloseable(Closeable closeable) { - serverSession.addCloseable(closeable); - } public void withinContext(Runnable run) throws Exception { OperationContext context = recoverContext(); @@ -434,6 +430,13 @@ public class AMQPSessionCallback implements SessionCallback { } } + @Override + public void close(boolean failed) { + if (protonSession != null) { + protonSession.close(); + } + } + public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception { if (transaction == null) { transaction = serverSession.getCurrentTransaction(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java index 74901b77d7..e0146c074c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java @@ -88,9 +88,6 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection, this); useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors(); this.routingContext = new RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection()); - if (sessionSPI != null) { - sessionSPI.addCloseable((boolean failed) -> clearLargeMessage()); - } } protected void recoverContext() { @@ -137,8 +134,8 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme } /** * The reason why we use the AtomicRunnable here - * is because PagingManager will call Runnables in case it was blocked. - * however it could call many Runnables + * is because PagingManager will call Runnable in case it was blocked. + * however it could call many Runnable * and this serves as a control to avoid duplicated calls * */ static class FlowControlRunner implements Runnable { @@ -178,10 +175,10 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme } } - public int incrementSettle() { + public void incrementSettle() { assert pendingSettles >= 0; connection.requireInHandler(); - return pendingSettles++; + pendingSettles++; } public void settle(Delivery settlement) { @@ -289,13 +286,13 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme @Override public void close(boolean remoteLinkClose) throws ActiveMQAMQPException { protonSession.removeReceiver(receiver); + clearLargeMessage(); } @Override public void close(ErrorCondition condition) throws ActiveMQAMQPException { receiver.setCondition(condition); close(false); - clearLargeMessage(); } protected abstract void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java index 1184c13ad5..a902a26894 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.qpid.proton.engine.Receiver; @@ -258,4 +259,20 @@ public class AMQPSessionCallbackTest { // Credit runnable should not grant what would be negative credit here Mockito.verify(receiver, never()).flow(anyInt()); } + + @Test + public void testCloseBoolCallsProtonSessionClose() throws Exception { + Mockito.reset(connection); + Mockito.when(manager.getServer()).thenReturn(server); + + // Capture credit runnable and invoke to trigger credit top off + ArgumentCaptor argument = ArgumentCaptor.forClass(Runnable.class); + AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext); + + AMQPSessionContext protonSession = Mockito.mock(AMQPSessionContext.class); + session.init(protonSession, null); + session.close(false); + + Mockito.verify(protonSession).close(); + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java index 491651bc16..696f9c5092 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager; import org.apache.activemq.artemis.core.server.RoutingContext; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; @@ -89,6 +90,49 @@ public class ProtonServerReceiverContextTest { doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL), null, new ActiveMQException(), Rejected.class); } + @Test + public void testClearLargeOnClose() throws Exception { + Receiver mockReceiver = mock(Receiver.class); + AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); + + when(mockConnContext.getAmqpCredits()).thenReturn(100); + when(mockConnContext.getAmqpLowCredits()).thenReturn(30); + + when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class)); + + AMQPSessionCallback mockSessionSpi = mock(AMQPSessionCallback.class); + when(mockSessionSpi.getStorageManager()).thenReturn(new NullStorageManager()); + + AMQPSessionContext mockProtonContext = mock(AMQPSessionContext.class); + + AtomicInteger clearLargeMessage = new AtomicInteger(0); + ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSessionSpi, mockConnContext, mockProtonContext, mockReceiver) { + @Override + protected void clearLargeMessage() { + super.clearLargeMessage(); + clearLargeMessage.incrementAndGet(); + } + }; + + Delivery mockDelivery = mock(Delivery.class); + when(mockDelivery.isAborted()).thenReturn(false); + when(mockDelivery.isPartial()).thenReturn(false); + when(mockDelivery.getLink()).thenReturn(mockReceiver); + + when(mockReceiver.current()).thenReturn(mockDelivery); + + rc.onMessage(mockDelivery); + + rc.close(true); + + verify(mockReceiver, times(1)).current(); + verify(mockReceiver, times(1)).advance(); + + Assert.assertTrue(clearLargeMessage.get() > 0); + + } + + private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException { Receiver mockReceiver = mock(Receiver.class); AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 8f7ed0e26a..dfc7771da1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -305,6 +305,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener { this.closeables.add(closeable); } + // for testing + public final Set getCloseables() { + return closeables; + } + public Map getTempQueueCleanUppers() { return tempQueueCleannerUppers; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java index b16eafa6dd..cd57ff0916 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java @@ -16,13 +16,17 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; +import org.apache.activemq.artemis.core.server.ServerSession; +import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; import org.apache.activemq.transport.amqp.client.AmqpValidator; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Session; +import org.junit.Assert; import org.junit.Test; public class AmqpSessionTest extends AmqpClientTestSupport { @@ -71,4 +75,28 @@ public class AmqpSessionTest extends AmqpClientTestSupport { connection.getStateInspector().assertValid(); connection.close(); } + + + @Test(timeout = 60000) + public void testCreateSessionProducerConsumerDoesNotLeakClosable() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + assertNotNull(session); + + for (int i = 0; i < 10; i++) { + AmqpReceiver receiver = session.createReceiver(getQueueName()); + AmqpSender sender = session.createSender(getQueueName()); + receiver.close(); + sender.close(); + } + + assertEquals(1, server.getSessions().size()); + for (ServerSession serverSession : server.getSessions()) { + Assert.assertNull( ((ServerSessionImpl) serverSession).getCloseables()); + } + + connection.close(); + } + }