ARTEMIS-2757 improving flow control in AMQP

This commit is contained in:
Clebert Suconic 2020-05-12 18:09:26 -04:00
parent 75f72b63a5
commit 9ff3c17525
8 changed files with 510 additions and 47 deletions

View File

@ -461,6 +461,9 @@ public class AMQPSessionCallback implements SessionCallback {
SimpleString address,
RoutingContext routingContext,
AMQPMessage message) throws Exception {
context.incrementSettle();
if (address != null) {
message.setAddress(address);
} else {
@ -468,7 +471,7 @@ public class AMQPSessionCallback implements SessionCallback {
address = message.getAddressSimpleString();
if (address == null) {
// Errors are not currently handled as required by AMQP 1.0 anonterm-v1.0
rejectMessage(delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
rejectMessage(context, delivery, Symbol.valueOf("failed"), "Missing 'to' field for message sent to an anonymous producer");
return;
}
}
@ -504,7 +507,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
private void rejectMessage(Delivery delivery, Symbol errorCondition, String errorMessage) {
private void rejectMessage(final ProtonServerReceiverContext context, Delivery delivery, Symbol errorCondition, String errorMessage) {
ErrorCondition condition = new ErrorCondition();
condition.setCondition(errorCondition);
condition.setDescription(errorMessage);
@ -514,9 +517,9 @@ public class AMQPSessionCallback implements SessionCallback {
afterIO(new IOCallback() {
@Override
public void done() {
connection.runLater(() -> {
connection.runNow(() -> {
delivery.disposition(rejected);
delivery.settle();
context.settle(delivery);
connection.flush();
});
}
@ -543,7 +546,7 @@ public class AMQPSessionCallback implements SessionCallback {
afterIO(new IOCallback() {
@Override
public void done() {
connection.runLater(() -> {
connection.runNow(() -> {
if (delivery.getRemoteState() instanceof TransactionalState) {
TransactionalState txAccepted = new TransactionalState();
txAccepted.setOutcome(Accepted.getInstance());
@ -553,9 +556,8 @@ public class AMQPSessionCallback implements SessionCallback {
} else {
delivery.disposition(Accepted.getInstance());
}
delivery.settle();
context.flow();
connection.instantFlush();
context.settle(delivery);
connection.flush();
});
}
@ -565,7 +567,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
});
} else {
rejectMessage(delivery, Symbol.valueOf("failed"), "Interceptor rejected message");
rejectMessage(context, delivery, Symbol.valueOf("failed"), "Interceptor rejected message");
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);

View File

@ -149,6 +149,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
handler.requireHandler();
}
public boolean isHandler() {
return handler.isHandler();
}
public void scheduledFlush() {
handler.scheduledFlush();
}
@ -195,6 +199,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
handler.flush();
}
public void afterFlush(Runnable runnable) {
handler.afterFlush(runnable);
}
public void close(ErrorCondition errorCondition) {
Future<?> scheduledFuture = scheduledFutureRef.getAndSet(null);

View File

@ -44,7 +44,6 @@ import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
@ -86,34 +85,87 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
* In case the creditRunnable was run, we reset and send it over.
* We set it as ran as the first one should always go through
*/
protected final AtomicRunnable creditRunnable;
protected final Runnable creditRunnable;
protected final Runnable spiFlow = this::sessionSPIFlow;
private final boolean useModified;
/** no need to synchronize this as we only update it while in handler
* @see #incrementSettle()
*/
private int pendingSettles = 0;
/**
* This Credit Runnable may be used in Mock tests to simulate the credit semantic here
*/
public static AtomicRunnable createCreditRunnable(int refill,
int threshold,
Receiver receiver,
AMQPConnectionContext connection) {
Runnable creditRunnable = () -> {
public static Runnable createCreditRunnable(int refill,
int threshold,
Receiver receiver,
AMQPConnectionContext connection,
ProtonServerReceiverContext context) {
return new FlowControlRunner(refill, threshold, receiver, connection, context);
}
/**
* This Credit Runnable may be used in Mock tests to simulate the credit semantic here
*/
public static Runnable createCreditRunnable(int refill,
int threshold,
Receiver receiver,
AMQPConnectionContext connection) {
return new FlowControlRunner(refill, threshold, receiver, connection, null);
}
/**
* The reason why we use the AtomicRunnable here
* is because PagingManager will call Runnables in case it was blocked.
* however it could call many Runnables
* and this serves as a control to avoid duplicated calls
* */
static class FlowControlRunner implements Runnable {
final int refill;
final int threshold;
final Receiver receiver;
final AMQPConnectionContext connection;
final ProtonServerReceiverContext context;
FlowControlRunner(int refill, int threshold, Receiver receiver, AMQPConnectionContext connection, ProtonServerReceiverContext context) {
this.refill = refill;
this.threshold = threshold;
this.receiver = receiver;
this.connection = connection;
this.context = context;
}
@Override
public void run() {
if (!connection.isHandler()) {
// for the case where the paging manager is resuming flow due to blockage
// this should then move back to the connection thread.
connection.runLater(this);
return;
}
connection.requireInHandler();
if (receiver.getCredit() <= threshold) {
int topUp = refill - receiver.getCredit();
int pending = context != null ? context.pendingSettles : 0;
if (isBellowThreshold(receiver.getCredit(), pending, threshold)) {
int topUp = calculatedUpdateRefill(refill, receiver.getCredit(), pending);
if (topUp > 0) {
// System.out.println("Sending " + topUp + " towards client");
receiver.flow(topUp);
connection.flush();
connection.instantFlush();
}
}
};
return new AtomicRunnable() {
@Override
public void atomicRun() {
connection.runNow(creditRunnable);
}
};
}
}
public static boolean isBellowThreshold(int credit, int pending, int threshold) {
return credit <= threshold - pending;
}
public static int calculatedUpdateRefill(int refill, int credits, int pending) {
return refill - credits - pending;
}
/*
@ -138,7 +190,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
this.sessionSPI = sessionSPI;
this.amqpCredits = connection.getAmqpCredits();
this.minCreditRefresh = connection.getAmqpLowCredits();
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection).setRan();
this.creditRunnable = createCreditRunnable(amqpCredits, minCreditRefresh, receiver, connection, this);
useModified = this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
this.minLargeMessageSize = connection.getProtocolManager().getAmqpMinLargeMessageSize();
@ -403,8 +455,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
useModified,
e);
delivery.disposition(deliveryState);
delivery.settle();
flow();
settle(delivery);
connection.flush();
});
}
@ -473,14 +524,27 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
clearLargeMessage();
}
public void flow() {
public int incrementSettle() {
assert pendingSettles >= 0;
connection.requireInHandler();
if (!creditRunnable.isRun()) {
return; // nothing to be done as the previous one did not run yet
}
return pendingSettles++;
}
creditRunnable.reset();
public void settle(Delivery settlement) {
connection.requireInHandler();
pendingSettles--;
assert pendingSettles >= 0;
settlement.settle();
flow();
}
public void flow() {
// this will mark flow control to happen once after the event loop
connection.afterFlush(spiFlow);
}
private void sessionSPIFlow() {
connection.requireInHandler();
// Use the SessionSPI to allocate producer credits, or default, always allocate credit.
if (sessionSPI != null) {
sessionSPI.flow(address, creditRunnable);

View File

@ -20,8 +20,10 @@ import javax.security.auth.Subject;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBuf;
@ -91,6 +93,57 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
boolean flushInstantly = false;
/** afterFlush and afterFlushSet properties
* are set by afterFlush methods.
* This is to be called after the flush loop.
* this is usually to be used by flow control events that
* have to take place after the incoming bytes are settled.
*
* There is only one afterFlush most of the time, and for that reason
* as an optimization we will try to use a single place most of the time
* however if more are needed we will use the list.
* */
private Runnable afterFlush;
protected Set<Runnable> afterFlushSet;
public void afterFlush(Runnable runnable) {
requireHandler();
if (afterFlush == null) {
afterFlush = runnable;
return;
} else {
if (afterFlush != runnable) {
if (afterFlushSet == null) {
afterFlushSet = new HashSet<>();
}
afterFlushSet.add(runnable);
}
}
}
public void runAfterFlush() {
requireHandler();
if (afterFlush != null) {
Runnable toRun = afterFlush;
afterFlush = null;
// setting it to null to avoid recursive flushes
toRun.run();
}
if (afterFlushSet != null) {
// This is not really expected to happen.
// most of the time we will only have a single Runnable needing after flush
// as this was written for flow control
// however in extreme of caution, I'm dealing with a case where more than one is used.
Set<Runnable> toRun = afterFlushSet;
// setting it to null to avoid recursive flushes
afterFlushSet = null;
for (Runnable runnable : toRun) {
runnable.run();
}
}
}
public ProtonHandler(EventLoop workerExecutor, ArtemisExecutor poolExecutor, boolean isServer) {
this.workerExecutor = workerExecutor;
this.poolExecutor = poolExecutor;
@ -148,6 +201,10 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
return transport.capacity();
}
public boolean isHandler() {
return workerExecutor.inEventLoop();
}
public void requireHandler() {
if (!workerExecutor.inEventLoop()) {
throw new IllegalStateException("this method requires to be called within the handler, use the executor");
@ -530,6 +587,8 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
}
flushBytes();
runAfterFlush();
}

View File

@ -33,11 +33,9 @@ import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;
import org.mockito.stubbing.Answer;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_CREDITS_DEFAULT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LOW_CREDITS_DEFAULT;
@ -75,16 +73,7 @@ public class AMQPSessionCallbackTest {
@Before
public void setRule() {
// The connection will call the runnable now on this mock, as these would happen on a different thread.
Mockito.doAnswer(new Answer() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
((Runnable) invocation.getArguments()[0]).run();
return null;
}
}).when(connection).runNow(Mockito.isA(Runnable.class));
Mockito.when(connection.isHandler()).thenReturn(true);
}
/**

View File

@ -0,0 +1,283 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ProgressivePromise;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.ScheduledFuture;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.junit.Assert;
import org.junit.Test;
public class ProtonHandlerAfterRunTest {
private static class FakeEventLoop implements EventLoop {
@Override
public EventLoopGroup parent() {
return null;
}
@Override
public EventLoop next() {
return null;
}
@Override
public ChannelFuture register(Channel channel) {
return null;
}
@Override
public ChannelFuture register(ChannelPromise promise) {
return null;
}
@Override
public ChannelFuture register(Channel channel, ChannelPromise promise) {
return null;
}
@Override
public boolean inEventLoop() {
return true;
}
@Override
public boolean inEventLoop(Thread thread) {
return true;
}
@Override
public <V> Promise<V> newPromise() {
return null;
}
@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
return null;
}
@Override
public <V> Future<V> newSucceededFuture(V result) {
return null;
}
@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
return null;
}
@Override
public boolean isShuttingDown() {
return false;
}
@Override
public Future<?> shutdownGracefully() {
return null;
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return null;
}
@Override
public Future<?> terminationFuture() {
return null;
}
@Override
public void shutdown() {
}
@Override
public List<Runnable> shutdownNow() {
return null;
}
@Override
public Iterator<EventExecutor> iterator() {
return null;
}
@Override
public Future<?> submit(Runnable task) {
return null;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return null;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return null;
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return null;
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return null;
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return null;
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return null;
}
@Override
public boolean isShutdown() {
return false;
}
@Override
public boolean isTerminated() {
return false;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return null;
}
@Override
public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public void execute(Runnable command) {
}
}
@Test
public void testAfterRun() {
ProtonHandler handler = new ProtonHandler(new FakeEventLoop(), null, true);
AtomicInteger value = new AtomicInteger(0);
AtomicInteger value2 = new AtomicInteger(0);
Runnable run = value::incrementAndGet;
Runnable run2 = value2::incrementAndGet;
handler.afterFlush(run);
handler.afterFlush(run);
handler.afterFlush(run);
handler.runAfterFlush();
Assert.assertEquals(1, value.get());
Assert.assertEquals(0, value2.get());
handler.runAfterFlush();
Assert.assertEquals(1, value.get());
Assert.assertEquals(0, value2.get());
handler.afterFlush(run);
handler.runAfterFlush();
Assert.assertEquals(2, value.get());
Assert.assertEquals(0, value2.get());
handler.afterFlush(run);
handler.afterFlush(run);
handler.afterFlush(run);
handler.afterFlush(run2);
handler.afterFlush(run2);
handler.afterFlush(run2);
handler.afterFlush(run2);
handler.afterFlush(run);
handler.afterFlush(run);
handler.afterFlush(run);
handler.runAfterFlush();
Assert.assertEquals(3, value.get());
Assert.assertEquals(1, value2.get());
handler.runAfterFlush();
Assert.assertEquals(3, value.get());
Assert.assertEquals(1, value2.get());
handler.afterFlush(run2);
handler.runAfterFlush();
Assert.assertEquals(3, value.get());
Assert.assertEquals(2, value2.get());
handler.runAfterFlush();
Assert.assertEquals(3, value.get());
Assert.assertEquals(2, value2.get());
}
@Test
public void testRecursiveLoop() {
ProtonHandler handler = new ProtonHandler(new FakeEventLoop(), null, true);
Runnable run = handler::runAfterFlush;
Runnable run2 = handler::runAfterFlush;
handler.afterFlush(run);
// make sure the code will not execute it recursevly if this is called in recursion
handler.runAfterFlush();
handler.afterFlush(run);
handler.afterFlush(run2);
handler.runAfterFlush();
}
}

View File

@ -157,6 +157,7 @@ public class ProtonServerReceiverContextTest {
Receiver mockReceiver = mock(Receiver.class);
ProtonServerReceiverContext rc = new ProtonServerReceiverContext(mockSession, mockConnContext, null, mockReceiver);
rc.incrementSettle();
Delivery mockDelivery = mock(Delivery.class);
when(mockDelivery.getLink()).thenReturn(mockReceiver);
@ -175,4 +176,14 @@ public class ProtonServerReceiverContextTest {
verify(mockDelivery, times(1)).disposition(any(expectedDeliveryState));
}
@Test
public void calculateFlowControl() {
Assert.assertFalse(ProtonServerReceiverContext.isBellowThreshold(1000, 100, 1000));
Assert.assertTrue(ProtonServerReceiverContext.isBellowThreshold(1000, 0, 1000));
Assert.assertEquals(1000, ProtonServerReceiverContext.calculatedUpdateRefill(2000, 1000, 0));
Assert.assertEquals(900, ProtonServerReceiverContext.calculatedUpdateRefill(2000, 1000, 100));
}
}

View File

@ -16,13 +16,21 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
@ -43,6 +51,10 @@ import org.junit.Test;
*/
public class AmqpSenderTest extends AmqpClientTestSupport {
@Override
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
}
@Test(timeout = 60000)
public void testSenderSettlementModeSettledIsHonored() throws Exception {
doTestSenderSettlementModeIsHonored(SenderSettleMode.SETTLED);
@ -153,6 +165,41 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testMixDurableAndNonDurable() throws Exception {
final int MSG_COUNT = 2000;
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", getBrokerAmqpConnectionURI().toString() + "?jms.forceAsyncSend=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(getQueueName());
MessageProducer sender = session.createProducer(queue);
boolean durable = false;
for (int i = 1; i <= MSG_COUNT; ++i) {
javax.jms.Message message = session.createMessage();
message.setIntProperty("i", i);
sender.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
durable = !durable; // flipping the switch
sender.send(message);
}
connection.start();
MessageConsumer receiver = session.createConsumer(queue);
for (int i = 1; i <= MSG_COUNT; ++i) {
javax.jms.Message message = receiver.receive(10000);
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
}
Assert.assertNull(receiver.receiveNoWait());
connection.close();
}
@Test(timeout = 60000)
public void testPresettledSender() throws Exception {
final int MSG_COUNT = 1000;