ARTEMIS-1898 Proper fix on AtomicRunnables and avoiding leaks

GlobalDiskFullTest was broken before this fix.
Basically when using multiple addresses over a session you would miss flow credits on all your producers except to the first one
that ran out of credit.
This commit is contained in:
Clebert Suconic 2018-10-10 18:09:28 -04:00
parent 2b3819bc19
commit fceb9ea718
4 changed files with 72 additions and 67 deletions

View File

@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
public abstract class AtomicRunnable implements Runnable {
public static Runnable checkAtomic(Runnable run) {
public static AtomicRunnable checkAtomic(Runnable run) {
if (run instanceof AtomicRunnable) {
return run;
return (AtomicRunnable)run;
} else {
return new AtomicRunnableWithDelegate(run);
}
@ -35,6 +35,20 @@ public abstract class AtomicRunnable implements Runnable {
private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran");
public AtomicRunnable reset() {
RAN_UPDATE.set(this, 0);
return this;
}
public AtomicRunnable setRan() {
RAN_UPDATE.set(this, 1);
return this;
}
public boolean isRun() {
return RAN_UPDATE.get(this) == 1;
}
@Override
public void run() {
if (RAN_UPDATE.compareAndSet(this, 0, 1)) {

View File

@ -109,8 +109,6 @@ public class AMQPSessionCallback implements SessionCallback {
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
private CreditRunnable creditRunnable;
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@ -577,49 +575,17 @@ public class AMQPSessionCallback implements SessionCallback {
});
}
public void offerProducerCredit(final SimpleString address,
final int credits,
final int threshold,
final Receiver receiver) {
/** Will execute a Runnable on an Address when there's space in memory*/
public void flow(final SimpleString address,
Runnable runnable) {
try {
/*
* The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the
* runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this
* may cause a memory leak, one is enough.
* */
if (creditRunnable != null && !creditRunnable.isRun())
return;
PagingManager pagingManager = manager.getServer().getPagingManager();
creditRunnable = new CreditRunnable() {
boolean isRun = false;
@Override
public boolean isRun() {
return isRun;
}
@Override
public void run() {
connection.lock();
try {
if (receiver.getCredit() <= threshold) {
int topUp = credits - receiver.getCredit();
if (topUp > 0) {
receiver.flow(topUp);
}
}
} finally {
isRun = true;
connection.unlock();
}
connection.flush();
}
};
if (address == null) {
pagingManager.checkMemory(creditRunnable);
pagingManager.checkMemory(runnable);
} else {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
store.checkMemory(creditRunnable);
store.checkMemory(runnable);
}
} catch (Exception e) {
throw new RuntimeException(e);
@ -791,7 +757,4 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
interface CreditRunnable extends Runnable {
boolean isRun();
}
}

View File

@ -34,6 +34,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
@ -60,6 +61,35 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
protected final AMQPSessionCallback sessionSPI;
/** We create this AtomicRunnable with setRan.
* This is because we always reuse the same instance.
* In case the creditRunnable was run, we reset and send it over.
* We set it as ran as the first one should always go through */
protected final AtomicRunnable creditRunnable;
/** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */
public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) {
return new AtomicRunnable() {
@Override
public void atomicRun() {
connection.lock();
try {
if (receiver.getCredit() <= threshold) {
int topUp = refill - receiver.getCredit();
if (topUp > 0) {
receiver.flow(topUp);
}
}
} finally {
connection.unlock();
}
connection.flush();
}
};
}
/*
The maximum number of credits we will allocate to clients.
This number is also used by the broker when refresh client credits.
@ -68,7 +98,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
// Used by the broker to decide when to refresh clients credit. This is not used when client requests credit.
private final int minCreditRefresh;
private TerminusExpiryPolicy expiryPolicy;
public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
@ -80,11 +109,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
this.sessionSPI = sessionSPI;
this.amqpCredits = connection.getAmqpCredits();
this.minCreditRefresh = connection.getAmqpLowCredits();
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
}
@Override
public void onFlow(int credits, boolean drain) {
flow(Math.min(credits, amqpCredits), amqpCredits);
flow();
}
@Override
@ -116,7 +146,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
} catch (Exception e) {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH;
target.setAddress(address.toString());
} else {
// the target will have an address unless the remote is requesting an anonymous
@ -182,7 +211,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
}
}
}
flow(amqpCredits, minCreditRefresh);
flow();
}
public RoutingType getRoutingType(Receiver receiver, SimpleString address) {
@ -245,7 +274,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data);
flow(amqpCredits, minCreditRefresh);
flow();
} catch (Exception e) {
log.warn(e.getMessage(), e);
Rejected rejected = new Rejected();
@ -262,7 +291,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
delivery.disposition(rejected);
delivery.settle();
flow(amqpCredits, minCreditRefresh);
flow();
}
}
@ -285,20 +314,18 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
close(false);
}
public void flow(int credits, int threshold) {
public void flow() {
if (!creditRunnable.isRun()) {
return; // nothing to be done as the previous one did not run yet
}
creditRunnable.reset();
// Use the SessionSPI to allocate producer credits, or default, always allocate credit.
if (sessionSPI != null) {
if (receiver.getCredit() <= threshold) {
sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
}
sessionSPI.flow(address, creditRunnable);
} else {
connection.lock();
try {
receiver.flow(credits);
} finally {
connection.unlock();
}
connection.flush();
creditRunnable.run();
}
}

View File

@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.engine.Receiver;
import org.junit.Rule;
@ -74,7 +75,7 @@ public class AMQPSessionCallbackTest {
// Credit is above threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
@ -105,7 +106,7 @@ public class AMQPSessionCallbackTest {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
@ -137,7 +138,7 @@ public class AMQPSessionCallbackTest {
// Credit is above threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1);
session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
@ -169,7 +170,7 @@ public class AMQPSessionCallbackTest {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
@ -200,7 +201,7 @@ public class AMQPSessionCallbackTest {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
session.offerProducerCredit(null, 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(null, ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingManager).checkMemory(argument.capture());
@ -232,7 +233,7 @@ public class AMQPSessionCallbackTest {
// Credit is at threshold
Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT);
session.offerProducerCredit(new SimpleString("test"), 1, AMQP_LOW_CREDITS_DEFAULT, receiver);
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());