ARTEMIS-1119 flow controlling connection

https://issues.apache.org/jira/browse/ARTEMIS-1119
This commit is contained in:
Clebert Suconic 2017-04-17 15:02:33 -04:00
parent 0a0955d0cc
commit 807e4e5d9c
4 changed files with 35 additions and 2 deletions

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
@ -156,6 +157,11 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
connection.write(new ChannelBufferWrapper(byteBuf, true)); connection.write(new ChannelBufferWrapper(byteBuf, true));
} }
public boolean isWritable(ReadyListener readyListener) {
return connection.isWritable(readyListener);
}
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext()); return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
} }

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
@ -90,7 +91,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this); connectionCallback.setConnection(this);
this.handler = new ProtonHandler(); this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor());
handler.addEventHandler(this); handler.addEventHandler(this);
Transport transport = handler.getTransport(); Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false); transport.setEmitFlowEventOnSend(false);
@ -332,6 +333,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
connectionCallback.onTransport(bytes, this); connectionCallback.onTransport(bytes, this);
} }
@Override
public boolean flowControl(ReadyListener readyListener) {
return connectionCallback.isWritable(readyListener);
}
@Override @Override
public void onRemoteOpen(Connection connection) throws Exception { public void onRemoteOpen(Connection connection) throws Exception {
lock(); lock();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.proton.handler; package org.apache.activemq.artemis.protocol.amqp.proton.handler;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Link;
@ -78,4 +79,6 @@ public interface EventHandler {
void pushBytes(ByteBuf bytes); void pushBytes(ByteBuf bytes);
boolean flowControl(ReadyListener readyListener);
} }

View File

@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -29,6 +30,7 @@ import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
@ -71,14 +73,23 @@ public class ProtonHandler extends ProtonInitializable {
protected boolean receivedFirstPacket = false; protected boolean receivedFirstPacket = false;
private final Executor flushExecutor;
protected final ReadyListener readyListener;
boolean inDispatch = false; boolean inDispatch = false;
public ProtonHandler() { public ProtonHandler(Executor flushExecutor) {
this.flushExecutor = flushExecutor;
this.readyListener = () -> flushExecutor.execute(() -> {
flush();
});
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
transport.bind(connection); transport.bind(connection);
connection.collect(collector); connection.collect(collector);
} }
public long tick(boolean firstTick) { public long tick(boolean firstTick) {
lock.lock(); lock.lock();
try { try {
@ -161,6 +172,13 @@ public class ProtonHandler extends ProtonInitializable {
} }
public void flushBytes() { public void flushBytes() {
for (EventHandler handler : handlers) {
if (!handler.flowControl(readyListener)) {
return;
}
}
lock.lock(); lock.lock();
try { try {
while (true) { while (true) {