From 807e4e5d9cef75985af09286f13664282ee0a74c Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 17 Apr 2017 15:02:33 -0400 Subject: [PATCH] ARTEMIS-1119 flow controlling connection https://issues.apache.org/jira/browse/ARTEMIS-1119 --- .../amqp/broker/AMQPConnectionCallback.java | 6 ++++++ .../amqp/proton/AMQPConnectionContext.java | 8 +++++++- .../amqp/proton/handler/EventHandler.java | 3 +++ .../amqp/proton/handler/ProtonHandler.java | 20 ++++++++++++++++++- 4 files changed, 35 insertions(+), 2 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 29a4df3f55..31bec9a7d5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; @@ -156,6 +157,11 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { connection.write(new ChannelBufferWrapper(byteBuf, true)); } + public boolean isWritable(ReadyListener readyListener) { + return connection.isWritable(readyListener); + } + + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext()); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 01736315d7..4a46a8a179 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.qpid.proton.amqp.Symbol; @@ -90,7 +91,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH this.scheduledPool = scheduledPool; connectionCallback.setConnection(this); - this.handler = new ProtonHandler(); + this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor()); handler.addEventHandler(this); Transport transport = handler.getTransport(); transport.setEmitFlowEventOnSend(false); @@ -332,6 +333,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH connectionCallback.onTransport(bytes, this); } + @Override + public boolean flowControl(ReadyListener readyListener) { + return connectionCallback.isWritable(readyListener); + } + @Override public void onRemoteOpen(Connection connection) throws Exception { lock(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java index 0ed17233fd..c8ba1367ad 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/EventHandler.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton.handler; import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; @@ -78,4 +79,6 @@ public interface EventHandler { void pushBytes(ByteBuf bytes); + boolean flowControl(ReadyListener readyListener); + } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index f1be934578..e3cb730d80 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -29,6 +30,7 @@ import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Symbol; @@ -71,14 +73,23 @@ public class ProtonHandler extends ProtonInitializable { protected boolean receivedFirstPacket = false; + private final Executor flushExecutor; + + protected final ReadyListener readyListener; + boolean inDispatch = false; - public ProtonHandler() { + public ProtonHandler(Executor flushExecutor) { + this.flushExecutor = flushExecutor; + this.readyListener = () -> flushExecutor.execute(() -> { + flush(); + }); this.creationTime = System.currentTimeMillis(); transport.bind(connection); connection.collect(collector); } + public long tick(boolean firstTick) { lock.lock(); try { @@ -161,6 +172,13 @@ public class ProtonHandler extends ProtonInitializable { } public void flushBytes() { + + for (EventHandler handler : handlers) { + if (!handler.flowControl(readyListener)) { + return; + } + } + lock.lock(); try { while (true) {