From b1b4bb8a32a9bb9c6d951f3d0fdd3626944319f9 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 8 Dec 2015 19:09:05 -0500 Subject: [PATCH] ARTEMIS-320 Refactoring TCP flow control and proper implementation of flow control on consumers https://issues.apache.org/jira/browse/ARTEMIS-320 --- .../client/impl/ClientSessionFactoryImpl.java | 20 ------- .../impl/ClientSessionFactoryInternal.java | 3 - .../core/client/impl/ClientSessionImpl.java | 14 +++-- .../client/impl/ClientSessionInternal.java | 4 +- .../core/client/impl/DelegatingSession.java | 6 +- .../core/impl/ActiveMQSessionContext.java | 6 ++ .../remoting/impl/netty/NettyConnection.java | 58 ++++++++++++------- .../remoting/impl/netty/NettyConnector.java | 4 ++ .../protocol/AbstractRemotingConnection.java | 6 ++ .../spi/core/protocol/RemotingConnection.java | 3 + .../artemis/spi/core/remoting/Connection.java | 8 +-- .../spi/core/remoting/ReadyListener.java | 2 +- .../spi/core/remoting/SessionContext.java | 2 + .../ActiveMQProtonConnectionCallback.java | 2 +- .../ProtonSessionIntegrationCallback.java | 25 ++++---- .../core/protocol/mqtt/MQTTConnection.java | 5 ++ .../core/protocol/mqtt/MQTTSession.java | 2 +- .../protocol/mqtt/MQTTSessionCallback.java | 21 +++---- .../protocol/openwire/OpenWireConnection.java | 4 ++ .../protocol/openwire/amq/AMQSession.java | 19 ++---- .../core/protocol/stomp/StompConnection.java | 5 ++ .../core/protocol/stomp/StompSession.java | 15 ++--- .../core/ServerSessionPacketHandler.java | 7 +++ .../core/impl/ActiveMQPacketHandler.java | 3 +- .../core/impl/CoreSessionCallback.java | 21 ++++--- .../remoting/impl/invm/InVMConnection.java | 16 ++--- .../remoting/impl/netty/NettyAcceptor.java | 2 + .../core/server/cluster/impl/BridgeImpl.java | 40 +++---------- .../core/server/impl/ServerConsumerImpl.java | 30 ++-------- .../spi/core/protocol/SessionCallback.java | 6 +- .../integration/client/HangConsumerTest.java | 21 ++----- 31 files changed, 176 insertions(+), 204 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 3b36345b9c..9df7ed45dc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -148,8 +148,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C private String liveNodeID; - private Set lifeCycleListeners; - // We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called. private boolean connectionReadyForWrites; @@ -222,8 +220,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); - lifeCycleListeners = new HashSet<>(); - connectionReadyForWrites = true; } @@ -238,14 +234,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C return newFailoverLock; } - @Override - public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { - synchronized (connectionReadyLock) { - lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(), connectionReadyForWrites); - lifeCycleListeners.add(lifeCycleListener); - } - } - @Override public void connect(final int initialConnectAttempts, final boolean failoverOnInitialConnection) throws ActiveMQException { @@ -395,14 +383,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C @Override public void connectionReadyForWrites(final Object connectionID, final boolean ready) { - synchronized (connectionReadyLock) { - if (connectionReadyForWrites != ready) { - connectionReadyForWrites = ready; - for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) { - lifeCycleListener.connectionReadyForWrites(connectionID, ready); - } - } - } } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java index 367161827f..ba2dab7694 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java @@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; public interface ClientSessionFactoryInternal extends ClientSessionFactory { @@ -58,6 +57,4 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory { ConfirmationWindowWarning getConfirmationWindowWarning(); Lock lockFailover(); - - void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index ca7e7672c2..dc8968006a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -44,8 +44,8 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; @@ -408,6 +408,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return createConsumer(SimpleString.toSimpleString(queueName), null, browseOnly); } + + @Override + public boolean isWritable(ReadyListener callback) { + return sessionContext.isWritable(callback); + } + + /** * Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on * the remoting thread). @@ -695,11 +702,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi return sessionFactory.getLiveNodeId(); } - @Override - public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { - sessionFactory.addLifeCycleListener(lifeCycleListener); - } - // ClientSessionInternal implementation // ------------------------------------------------------------ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java index cd697c0793..30c8404703 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java @@ -23,8 +23,8 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; public interface ClientSessionInternal extends ClientSession { @@ -126,5 +126,5 @@ public interface ClientSessionInternal extends ClientSession { String getNodeId(); - void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener); + boolean isWritable(ReadyListener callback); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java index dde59d2d17..087a150af5 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/DelegatingSession.java @@ -33,8 +33,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConcurrentHashSet; /** @@ -101,8 +101,8 @@ public class DelegatingSession implements ClientSessionInternal { } @Override - public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { - session.addLifeCycleListener(lifeCycleListener); + public boolean isWritable(ReadyListener callback) { + return session.isWritable(callback); } @Override diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 05ae4741af..f72380288a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -97,6 +97,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; import org.apache.activemq.artemis.utils.VersionLoader; @@ -238,6 +239,11 @@ public class ActiveMQSessionContext extends SessionContext { return response.toQueueQuery(); } + @Override + public boolean isWritable(ReadyListener callback) { + return remotingConnection.isWritable(callback); + } + @Override public ClientConsumerInternal createConsumer(SimpleString queueName, SimpleString filterString, diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 6f2f792550..7be83540fb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.remoting.impl.netty; import java.net.SocketAddress; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Semaphore; import io.netty.buffer.ByteBuf; @@ -39,7 +39,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.IPV6Util; public class NettyConnection implements Connection { @@ -65,10 +64,14 @@ public class NettyConnection implements Connection { private final Semaphore writeLock = new Semaphore(1); - private final Set readyListeners = new ConcurrentHashSet<>(); - private RemotingConnection protocolConnection; + private boolean ready = true; + + /** if {@link #isWritable(ReadyListener)} returns false, we add a callback + * here for when the connection (or Netty Channel) becomes available again. */ + private final ConcurrentLinkedDeque readyListeners = new ConcurrentLinkedDeque<>(); + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -96,6 +99,37 @@ public class NettyConnection implements Connection { } // Connection implementation ---------------------------- + + public boolean isWritable(ReadyListener callback) { + synchronized (readyListeners) { + readyListeners.push(callback); + + return ready; + } + } + + public void fireReady(final boolean ready) { + synchronized (readyListeners) { + this.ready = ready; + + if (ready) { + for (;;) { + ReadyListener readyListener = readyListeners.poll(); + if (readyListener == null) { + return; + } + + try { + readyListener.readyForWriting(); + } + catch (Throwable logOnly) { + ActiveMQClientLogger.LOGGER.warn(logOnly.getMessage(), logOnly); + } + } + } + } + } + @Override public void forceClose() { if (channel != null) { @@ -323,28 +357,12 @@ public class NettyConnection implements Connection { return directDeliver; } - @Override - public void addReadyListener(final ReadyListener listener) { - readyListeners.add(listener); - } - - @Override - public void removeReadyListener(final ReadyListener listener) { - readyListeners.remove(listener); - } - //never allow this @Override public ActiveMQPrincipal getDefaultActiveMQPrincipal() { return null; } - void fireReady(final boolean ready) { - for (ReadyListener listener : readyListeners) { - listener.readyForWriting(ready); - } - } - @Override public TransportConfiguration getConnectorConfig() { if (configuration != null) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index 3f5624880f..97c0fef0a0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -936,6 +936,10 @@ public class NettyConnector extends AbstractConnector { @Override public void connectionReadyForWrites(Object connectionID, boolean ready) { + NettyConnection connection = (NettyConnection)connections.get(connectionID); + if (connection != null) { + connection.fireReady(ready); + } listener.connectionReadyForWrites(connectionID, ready); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java index 32df48c77e..b759ccc497 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; public abstract class AbstractRemotingConnection implements RemotingConnection { @@ -50,6 +51,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection { return new ArrayList<>(failureListeners); } + @Override + public boolean isWritable(ReadyListener callback) { + return transportConnection.isWritable(callback); + } + protected void callFailureListeners(final ActiveMQException me, String scaleDownTargetNodeID) { final List listenersClone = new ArrayList<>(failureListeners); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java index 420314b44e..078e42e35f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; /** * A RemotingConnection is a connection between a client and a server. @@ -181,4 +182,6 @@ public interface RemotingConnection extends BufferHandler { */ void flush(); + boolean isWritable(ReadyListener callback); + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java index ac05267a16..ed10113da6 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/Connection.java @@ -39,6 +39,10 @@ public interface Connection { void setProtocolConnection(RemotingConnection connection); + boolean isWritable(ReadyListener listener); + + void fireReady(boolean ready); + /** * returns the unique id of this wire. * @@ -104,10 +108,6 @@ public interface Connection { */ void checkFlushBatchBuffer(); - void addReadyListener(ReadyListener listener); - - void removeReadyListener(ReadyListener listener); - /** * Generates a {@link TransportConfiguration} to be used to connect to the same target this is * connected to. diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java index b846a1a1c8..9103070c72 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/ReadyListener.java @@ -18,6 +18,6 @@ package org.apache.activemq.artemis.spi.core.remoting; public interface ReadyListener { - void readyForWriting(boolean ready); + void readyForWriting(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 1bdaffcd72..f766e48f00 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -269,4 +269,6 @@ public abstract class SessionContext { public abstract void cleanup(); public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits); + + public abstract boolean isWritable(ReadyListener callback); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java index 64a6232b0a..d8c0f18627 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ActiveMQProtonConnectionCallback.java @@ -114,7 +114,7 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback @Override public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { - return new ProtonSessionIntegrationCallback(this, manager, connection); + return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 88506b6ee1..5c3c41a414 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -20,6 +20,8 @@ import java.util.concurrent.Executor; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -36,7 +38,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; @@ -58,16 +59,26 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se private final AMQPConnectionContext connection; + private final Connection transportConnection; + + private ServerSession serverSession; private AMQPSessionContext protonSession; public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, - AMQPConnectionContext connection) { + AMQPConnectionContext connection, + Connection transportConnection) { this.protonSPI = protonSPI; this.manager = manager; this.connection = connection; + this.transportConnection = transportConnection; + } + + @Override + public boolean isWritable(ReadyListener callback) { + return transportConnection.isWritable(callback); } @Override @@ -305,16 +316,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se public void closed() { } - @Override - public void addReadyListener(ReadyListener listener) { - - } - - @Override - public void removeReadyListener(ReadyListener listener) { - - } - @Override public void disconnect(ServerConsumer consumer, String queueName) { synchronized (connection.getLock()) { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index 87849abf4c..0603951dde 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; public class MQTTConnection implements RemotingConnection { @@ -52,6 +53,10 @@ public class MQTTConnection implements RemotingConnection { this.destroyed = false; } + public boolean isWritable(ReadyListener callback) { + return transportConnection.isWritable(callback); + } + @Override public Object getID() { return transportConnection.getID(); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java index 64023a4dd3..d9c819c323 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java @@ -59,7 +59,7 @@ public class MQTTSession { mqttConnectionManager = new MQTTConnectionManager(this); mqttPublishManager = new MQTTPublishManager(this); - sessionCallback = new MQTTSessionCallback(this); + sessionCallback = new MQTTSessionCallback(this, connection); subscriptionManager = new MQTTSubscriptionManager(this); retainMessageManager = new MQTTRetainMessageManager(this); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java index 45a9192e0d..cf323d4781 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java @@ -25,12 +25,19 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; public class MQTTSessionCallback implements SessionCallback { - private MQTTSession session; + private final MQTTSession session; + private final MQTTConnection connection; private MQTTLogger log = MQTTLogger.LOGGER; - public MQTTSessionCallback(MQTTSession session) throws Exception { + public MQTTSessionCallback(MQTTSession session, MQTTConnection connection) throws Exception { this.session = session; + this.connection = connection; + } + + @Override + public boolean isWritable(ReadyListener callback) { + return connection.isWritable(callback); } @Override @@ -54,16 +61,6 @@ public class MQTTSessionCallback implements SessionCallback { return 1; } - @Override - public void addReadyListener(ReadyListener listener) { - session.getConnection().getTransportConnection().addReadyListener(listener); - } - - @Override - public void removeReadyListener(ReadyListener listener) { - session.getConnection().getTransportConnection().removeReadyListener(listener); - } - @Override public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { return sendMessage(message, consumer, deliveryCount); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 0bc87ee54c..43cc92db15 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -160,6 +161,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S this.creationTime = System.currentTimeMillis(); } + public boolean isWritable(ReadyListener callback) { + return transportConnection.isWritable(callback); + } // SecurityAuth implementation @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 076567663d..b4d04b76c9 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; @@ -58,7 +59,6 @@ import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { @@ -147,6 +147,11 @@ public class AMQSession implements SessionCallback { started.set(true); } + @Override + public boolean isWritable(ReadyListener callback) { + return connection.isWritable(callback); + } + @Override public void sendProducerCreditsMessage(int credits, SimpleString address) { // TODO Auto-generated method stub @@ -186,18 +191,6 @@ public class AMQSession implements SessionCallback { } - @Override - public void addReadyListener(ReadyListener listener) { - // TODO Auto-generated method stub - - } - - @Override - public void removeReadyListener(ReadyListener listener) { - // TODO Auto-generated method stub - - } - @Override public boolean hasCredits(ServerConsumer consumerID) { AMQConsumer amqConsumer = consumers.get(consumerID.getID()); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 2701b5f57d..31d73776dd 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.VersionLoader; @@ -118,6 +119,10 @@ public final class StompConnection implements RemotingConnection { return frame; } + public boolean isWritable(ReadyListener callback) { + return transportConnection.isWritable(callback); + } + public boolean hasBytes() { return frameHandler.hasBytes(); } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 85eebe035c..dd338f6e34 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -73,6 +73,11 @@ public class StompSession implements SessionCallback { this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration()); } + @Override + public boolean isWritable(ReadyListener callback) { + return connection.isWritable(callback); + } + void setServerSession(ServerSession session) { this.session = session; } @@ -181,16 +186,6 @@ public class StompSession implements SessionCallback { public void closed() { } - @Override - public void addReadyListener(final ReadyListener listener) { - connection.getTransportConnection().addReadyListener(listener); - } - - @Override - public void removeReadyListener(final ReadyListener listener) { - connection.getTransportConnection().removeReadyListener(listener); - } - @Override public void disconnect(ServerConsumer consumerId, String queueName) { StompSubscription stompSubscription = subscriptions.remove(consumerId.getID()); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index c8d783805a..78fd83f6f8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -611,6 +611,9 @@ public class ServerSessionPacketHandler implements ChannelHandler { newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence()); + + Connection oldTransportConnection = remotingConnection.getTransportConnection(); + remotingConnection = newConnection; remotingConnection.setCloseListeners(closeListeners); @@ -624,6 +627,10 @@ public class ServerSessionPacketHandler implements ChannelHandler { session.setTransferring(false); + // We do this because the old connection could be out of credits on netty + // this will force anything to resume after the reattach through the ReadyListener callbacks + oldTransportConnection.fireReady(true); + return serverLastReceivedCommandID; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index b60804eb6f..2de5adb9c1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -148,7 +148,8 @@ public class ActiveMQPacketHandler implements ChannelHandler { activeMQPrincipal = connection.getDefaultActiveMQPrincipal(); } - ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel), null, true); + ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), + new CoreSessionCallback(request.getName(), protocolManager, channel, connection), null, true); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); channel.setHandler(handler); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 2fe808f19c..4fc7879055 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -38,12 +39,20 @@ public final class CoreSessionCallback implements SessionCallback { private ProtocolManager protocolManager; + private final RemotingConnection connection; + private String name; - public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel) { + public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel, RemotingConnection connection) { this.name = name; this.protocolManager = protocolManager; this.channel = channel; + this.connection = connection; + } + + @Override + public boolean isWritable(ReadyListener callback) { + return connection.isWritable(callback); } @Override @@ -101,16 +110,6 @@ public final class CoreSessionCallback implements SessionCallback { protocolManager.removeHandler(name); } - @Override - public void addReadyListener(final ReadyListener listener) { - channel.getConnection().getTransportConnection().addReadyListener(listener); - } - - @Override - public void removeReadyListener(final ReadyListener listener) { - channel.getConnection().getTransportConnection().removeReadyListener(listener); - } - @Override public void disconnect(ServerConsumer consumerId, String queueName) { if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java index 7931f82ae0..0cbb5756b4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/invm/InVMConnection.java @@ -101,6 +101,14 @@ public class InVMConnection implements Connection { // no op } + public boolean isWritable(ReadyListener listener) { + return true; + } + + @Override + public void fireReady(boolean ready) { + } + @Override public RemotingConnection getProtocolConnection() { return this.protocolConnection; @@ -230,14 +238,6 @@ public class InVMConnection implements Connection { return -1; } - @Override - public void addReadyListener(ReadyListener listener) { - } - - @Override - public void removeReadyListener(ReadyListener listener) { - } - @Override public boolean isUsingProtocolHandling() { return false; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index ef563225f1..345981e852 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -687,6 +687,8 @@ public class NettyAcceptor implements Acceptor { if (conn != null) { conn.fireReady(ready); } + + listener.connectionReadyForWrites(connectionID, ready); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index ac75a1c9d3..b8b30bcd74 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.persistence.StorageManager; -import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.LargeServerMessage; @@ -58,8 +57,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.TypedProperties; @@ -69,7 +67,7 @@ import org.apache.activemq.artemis.utils.UUID; * A Core BridgeImpl */ -public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ConnectionLifeCycleListener { +public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener { // Constants ----------------------------------------------------- private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); @@ -135,8 +133,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private volatile ClientProducer producer; - private volatile boolean connectionWritable = false; - private volatile boolean started; private volatile boolean stopping = false; @@ -497,6 +493,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } + @Override + public void readyForWriting() { + queue.deliverAsync(); + } + @Override public HandleStatus handle(final MessageReference ref) throws Exception { if (filter != null && !filter.match(ref.getMessage())) { @@ -504,7 +505,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } synchronized (this) { - if (!active || !connectionWritable) { + if (!active || !session.isWritable(this)) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref); } @@ -555,29 +556,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } } - @Override - public void connectionCreated(ActiveMQComponent component, Connection connection, String protocol) { - - } - - @Override - public void connectionDestroyed(Object connectionID) { - - } - - @Override - public void connectionException(Object connectionID, ActiveMQException me) { - - } - - @Override - public void connectionReadyForWrites(Object connectionID, boolean ready) { - connectionWritable = ready; - if (connectionWritable) { - queue.deliverAsync(); - } - } - // FailureListener implementation -------------------------------- @Override @@ -891,8 +869,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled session.setSendAcknowledgementHandler(BridgeImpl.this); - session.addLifeCycleListener(BridgeImpl.this); - afterConnect(); active = true; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 243d139bed..e04c35c5a1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -21,7 +21,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; @@ -129,12 +128,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private boolean transferring = false; - /* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP) - * This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty - * write queue when the TCP buffer is full, e.g. the client is slow or has died. - */ - private final AtomicBoolean writeReady = new AtomicBoolean(true); - private final long creationTime; private AtomicLong consumerRateCheckTime = new AtomicLong(System.currentTimeMillis()); @@ -198,8 +191,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.strictUpdateDeliveryCount = strictUpdateDeliveryCount; - this.callback.addReadyListener(this); - this.creationTime = System.currentTimeMillis(); if (browseOnly) { @@ -220,6 +211,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + @Override + public void readyForWriting() { + promptDelivery(); + } + // ServerConsumer implementation // ---------------------------------------------------------------------- @@ -289,7 +285,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { // If the consumer is stopped then we don't accept the message, it // should go back into the // queue for delivery later. - if (!started || transferring) { + if (!started || transferring || !callback.isWritable(this)) { return HandleStatus.BUSY; } @@ -395,8 +391,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); } - callback.removeReadyListener(this); - setStarted(false); LargeMessageDeliverer del = largeMessageDeliverer; @@ -811,18 +805,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } - @Override - public void readyForWriting(final boolean ready) { - if (ready) { - writeReady.set(true); - - promptDelivery(); - } - else { - writeReady.set(false); - } - } - /** * To be used on tests only */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index 3c9670d03c..83c4e93cee 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -43,9 +43,7 @@ public interface SessionCallback { void closed(); - void addReadyListener(ReadyListener listener); - - void removeReadyListener(ReadyListener listener); - void disconnect(ServerConsumer consumerId, String queueName); + + boolean isWritable(ReadyListener callback); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 8199b4c61c..5c0e2724a6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -483,6 +483,11 @@ public class HangConsumerTest extends ActiveMQTestBase { targetCallback.sendProducerCreditsMessage(credits, address); } + @Override + public boolean isWritable(ReadyListener callback) { + return true; + } + @Override public void sendProducerCreditsFailMessage(int credits, SimpleString address) { targetCallback.sendProducerCreditsFailMessage(credits, address); @@ -538,22 +543,6 @@ public class HangConsumerTest extends ActiveMQTestBase { targetCallback.closed(); } - /* (non-Javadoc) - * @see SessionCallback#addReadyListener(ReadyListener) - */ - @Override - public void addReadyListener(ReadyListener listener) { - targetCallback.addReadyListener(listener); - } - - /* (non-Javadoc) - * @see SessionCallback#removeReadyListener(ReadyListener) - */ - @Override - public void removeReadyListener(ReadyListener listener) { - targetCallback.removeReadyListener(listener); - } - @Override public void disconnect(ServerConsumer consumerId, String queueName) { //To change body of implemented methods use File | Settings | File Templates.