This commit is contained in:
Clebert Suconic 2021-08-05 09:27:12 -04:00
commit 969901a5ff
6 changed files with 106 additions and 12 deletions

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.Map;
import java.util.concurrent.Executor;
import org.apache.activemq.artemis.Closeable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -160,9 +159,6 @@ public class AMQPSessionCallback implements SessionCallback {
}
public void addCloseable(Closeable closeable) {
serverSession.addCloseable(closeable);
}
public void withinContext(Runnable run) throws Exception {
OperationContext context = recoverContext();
@ -434,6 +430,13 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
@Override
public void close(boolean failed) {
if (protonSession != null) {
protonSession.close();
}
}
public void ack(Transaction transaction, Object brokerConsumer, Message message) throws Exception {
if (transaction == null) {
transaction = serverSession.getCurrentTransaction();

View File

@ -88,9 +88,6 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection, this);
useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
this.routingContext = new RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
if (sessionSPI != null) {
sessionSPI.addCloseable((boolean failed) -> clearLargeMessage());
}
}
protected void recoverContext() {
@ -137,8 +134,8 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
}
/**
* The reason why we use the AtomicRunnable here
* is because PagingManager will call Runnables in case it was blocked.
* however it could call many Runnables
* is because PagingManager will call Runnable in case it was blocked.
* however it could call many Runnable
* and this serves as a control to avoid duplicated calls
* */
static class FlowControlRunner implements Runnable {
@ -178,10 +175,10 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
}
}
public int incrementSettle() {
public void incrementSettle() {
assert pendingSettles >= 0;
connection.requireInHandler();
return pendingSettles++;
pendingSettles++;
}
public void settle(Delivery settlement) {
@ -289,13 +286,13 @@ public abstract class ProtonAbstractReceiver extends ProtonInitializable impleme
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
protonSession.removeReceiver(receiver);
clearLargeMessage();
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
receiver.setCondition(condition);
close(false);
clearLargeMessage();
}
protected abstract void actualDelivery(AMQPMessage message, Delivery delivery, Receiver receiver, Transaction tx);

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.engine.Receiver;
@ -258,4 +259,20 @@ public class AMQPSessionCallbackTest {
// Credit runnable should not grant what would be negative credit here
Mockito.verify(receiver, never()).flow(anyInt());
}
@Test
public void testCloseBoolCallsProtonSessionClose() throws Exception {
Mockito.reset(connection);
Mockito.when(manager.getServer()).thenReturn(server);
// Capture credit runnable and invoke to trigger credit top off
ArgumentCaptor<Runnable> argument = ArgumentCaptor.forClass(Runnable.class);
AMQPSessionCallback session = new AMQPSessionCallback(protonSPI, manager, connection, transportConnection, executor, operationContext);
AMQPSessionContext protonSession = Mockito.mock(AMQPSessionContext.class);
session.init(protonSession, null);
session.close(false);
Mockito.verify(protonSession).close();
}
}

View File

@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@ -89,6 +90,49 @@ public class ProtonServerReceiverContextTest {
doOnMessageWithDeliveryException(singletonList(Accepted.DESCRIPTOR_SYMBOL), null, new ActiveMQException(), Rejected.class);
}
@Test
public void testClearLargeOnClose() throws Exception {
Receiver mockReceiver = mock(Receiver.class);
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);
when(mockConnContext.getAmqpCredits()).thenReturn(100);
when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
AMQPSessionCallback mockSessionSpi = mock(AMQPSessionCallback.class);
when(mockSessionSpi.getStorageManager()).thenReturn(new NullStorageManager());
AMQPSessionContext mockProtonContext = mock(AMQPSessionContext.class);
AtomicInteger clearLargeMessage = new AtomicInteger(0);
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSessionSpi, mockConnContext, mockProtonContext, mockReceiver) {
@Override
protected void clearLargeMessage() {
super.clearLargeMessage();
clearLargeMessage.incrementAndGet();
}
};
Delivery mockDelivery = mock(Delivery.class);
when(mockDelivery.isAborted()).thenReturn(false);
when(mockDelivery.isPartial()).thenReturn(false);
when(mockDelivery.getLink()).thenReturn(mockReceiver);
when(mockReceiver.current()).thenReturn(mockDelivery);
rc.onMessage(mockDelivery);
rc.close(true);
verify(mockReceiver, times(1)).current();
verify(mockReceiver, times(1)).advance();
Assert.assertTrue(clearLargeMessage.get() > 0);
}
private void doOnMessageWithAbortedDeliveryTestImpl(boolean drain) throws ActiveMQAMQPException {
Receiver mockReceiver = mock(Receiver.class);
AMQPConnectionContext mockConnContext = mock(AMQPConnectionContext.class);

View File

@ -305,6 +305,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
this.closeables.add(closeable);
}
// for testing
public final Set<Closeable> getCloseables() {
return closeables;
}
public Map<SimpleString, TempQueueCleanerUpper> getTempQueueCleanUppers() {
return tempQueueCleannerUppers;
}

View File

@ -16,13 +16,17 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import org.junit.Assert;
import org.junit.Test;
public class AmqpSessionTest extends AmqpClientTestSupport {
@ -71,4 +75,28 @@ public class AmqpSessionTest extends AmqpClientTestSupport {
connection.getStateInspector().assertValid();
connection.close();
}
@Test(timeout = 60000)
public void testCreateSessionProducerConsumerDoesNotLeakClosable() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
assertNotNull(session);
for (int i = 0; i < 10; i++) {
AmqpReceiver receiver = session.createReceiver(getQueueName());
AmqpSender sender = session.createSender(getQueueName());
receiver.close();
sender.close();
}
assertEquals(1, server.getSessions().size());
for (ServerSession serverSession : server.getSessions()) {
Assert.assertNull( ((ServerSessionImpl) serverSession).getCloseables());
}
connection.close();
}
}