diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java index f1f53ce11f..9ec5e8f48d 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java @@ -22,9 +22,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; public abstract class AtomicRunnable implements Runnable { - public static Runnable checkAtomic(Runnable run) { + public static AtomicRunnable checkAtomic(Runnable run) { if (run instanceof AtomicRunnable) { - return run; + return (AtomicRunnable)run; } else { return new AtomicRunnableWithDelegate(run); } @@ -35,6 +35,20 @@ public abstract class AtomicRunnable implements Runnable { private static final AtomicIntegerFieldUpdater RAN_UPDATE = AtomicIntegerFieldUpdater.newUpdater(AtomicRunnable.class, "ran"); + public AtomicRunnable reset() { + RAN_UPDATE.set(this, 0); + return this; + } + + public AtomicRunnable setRan() { + RAN_UPDATE.set(this, 1); + return this; + } + + public boolean isRun() { + return RAN_UPDATE.get(this) == 1; + } + @Override public void run() { if (RAN_UPDATE.compareAndSet(this, 0, 1)) { diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 7fef3dbc68..3d8ae5a934 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -109,8 +109,6 @@ public class AMQPSessionCallback implements SessionCallback { private final AddressQueryCache addressQueryCache = new AddressQueryCache<>(); - private CreditRunnable creditRunnable; - public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -577,49 +575,17 @@ public class AMQPSessionCallback implements SessionCallback { }); } - public void offerProducerCredit(final SimpleString address, - final int credits, - final int threshold, - final Receiver receiver) { + /** Will execute a Runnable on an Address when there's space in memory*/ + public void flow(final SimpleString address, + Runnable runnable) { try { - /* - * The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the - * runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this - * may cause a memory leak, one is enough. - * */ - if (creditRunnable != null && !creditRunnable.isRun()) - return; PagingManager pagingManager = manager.getServer().getPagingManager(); - creditRunnable = new CreditRunnable() { - boolean isRun = false; - @Override - public boolean isRun() { - return isRun; - } - - @Override - public void run() { - connection.lock(); - try { - if (receiver.getCredit() <= threshold) { - int topUp = credits - receiver.getCredit(); - if (topUp > 0) { - receiver.flow(topUp); - } - } - } finally { - isRun = true; - connection.unlock(); - } - connection.flush(); - } - }; if (address == null) { - pagingManager.checkMemory(creditRunnable); + pagingManager.checkMemory(runnable); } else { final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); - store.checkMemory(creditRunnable); + store.checkMemory(runnable); } } catch (Exception e) { throw new RuntimeException(e); @@ -791,7 +757,4 @@ public class AMQPSessionCallback implements SessionCallback { } } - interface CreditRunnable extends Runnable { - boolean isRun(); - } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 0f0e9d5070..0758714837 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -34,6 +34,7 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; @@ -60,6 +61,35 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements protected final AMQPSessionCallback sessionSPI; + /** We create this AtomicRunnable with setRan. + * This is because we always reuse the same instance. + * In case the creditRunnable was run, we reset and send it over. + * We set it as ran as the first one should always go through */ + protected final AtomicRunnable creditRunnable; + + + /** This Credit Runnable may be used in Mock tests to simulate the credit semantic here */ + public static AtomicRunnable createCreditRunnable(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection) { + return new AtomicRunnable() { + @Override + public void atomicRun() { + connection.lock(); + try { + if (receiver.getCredit() <= threshold) { + int topUp = refill - receiver.getCredit(); + if (topUp > 0) { + receiver.flow(topUp); + } + } + } finally { + connection.unlock(); + } + connection.flush(); + } + }; + } + + /* The maximum number of credits we will allocate to clients. This number is also used by the broker when refresh client credits. @@ -68,7 +98,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements // Used by the broker to decide when to refresh clients credit. This is not used when client requests credit. private final int minCreditRefresh; - private TerminusExpiryPolicy expiryPolicy; public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, @@ -80,11 +109,12 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements this.sessionSPI = sessionSPI; this.amqpCredits = connection.getAmqpCredits(); this.minCreditRefresh = connection.getAmqpLowCredits(); + this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan(); } @Override public void onFlow(int credits, boolean drain) { - flow(Math.min(credits, amqpCredits), amqpCredits); + flow(); } @Override @@ -116,7 +146,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } - expiryPolicy = target.getExpiryPolicy() != null ? target.getExpiryPolicy() : TerminusExpiryPolicy.LINK_DETACH; target.setAddress(address.toString()); } else { // the target will have an address unless the remote is requesting an anonymous @@ -182,7 +211,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements } } } - flow(amqpCredits, minCreditRefresh); + flow(); } public RoutingType getRoutingType(Receiver receiver, SimpleString address) { @@ -245,7 +274,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements sessionSPI.serverSend(this, tx, receiver, delivery, address, delivery.getMessageFormat(), data); - flow(amqpCredits, minCreditRefresh); + flow(); } catch (Exception e) { log.warn(e.getMessage(), e); Rejected rejected = new Rejected(); @@ -262,7 +291,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements delivery.disposition(rejected); delivery.settle(); - flow(amqpCredits, minCreditRefresh); + flow(); } } @@ -285,20 +314,18 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements close(false); } - public void flow(int credits, int threshold) { + public void flow() { + if (!creditRunnable.isRun()) { + return; // nothing to be done as the previous one did not run yet + } + + creditRunnable.reset(); + // Use the SessionSPI to allocate producer credits, or default, always allocate credit. if (sessionSPI != null) { - if (receiver.getCredit() <= threshold) { - sessionSPI.offerProducerCredit(address, credits, threshold, receiver); - } + sessionSPI.flow(address, creditRunnable); } else { - connection.lock(); - try { - receiver.flow(credits); - } finally { - connection.unlock(); - } - connection.flush(); + creditRunnable.run(); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java index e86e960209..30814a92c4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; +import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.qpid.proton.engine.Receiver; import org.junit.Rule; @@ -74,7 +75,7 @@ public class AMQPSessionCallbackTest { // Credit is above threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1); - session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver); + session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. Mockito.verify(pagingManager).checkMemory(argument.capture()); @@ -105,7 +106,7 @@ public class AMQPSessionCallbackTest { // Credit is at threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT); - session.offerProducerCredit(null, AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver); + session.flow(null, ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. Mockito.verify(pagingManager).checkMemory(argument.capture()); @@ -137,7 +138,7 @@ public class AMQPSessionCallbackTest { // Credit is above threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT + 1); - session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver); + session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. Mockito.verify(pagingStore).checkMemory(argument.capture()); @@ -169,7 +170,7 @@ public class AMQPSessionCallbackTest { // Credit is at threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT); - session.offerProducerCredit(new SimpleString("test"), AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver); + session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. Mockito.verify(pagingStore).checkMemory(argument.capture()); @@ -200,7 +201,7 @@ public class AMQPSessionCallbackTest { // Credit is at threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT); - session.offerProducerCredit(null, 1, AMQP_LOW_CREDITS_DEFAULT, receiver); + session.flow(null, ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. Mockito.verify(pagingManager).checkMemory(argument.capture()); @@ -232,7 +233,7 @@ public class AMQPSessionCallbackTest { // Credit is at threshold Mockito.when(receiver.getCredit()).thenReturn(AMQP_LOW_CREDITS_DEFAULT); - session.offerProducerCredit(new SimpleString("test"), 1, AMQP_LOW_CREDITS_DEFAULT, receiver); + session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. Mockito.verify(pagingStore).checkMemory(argument.capture());