ARTEMIS-320 Refactoring TCP flow control and proper implementation of flow control on consumers

https://issues.apache.org/jira/browse/ARTEMIS-320
This commit is contained in:
Clebert Suconic 2015-12-08 19:09:05 -05:00
parent 351bcfc9f9
commit b1b4bb8a32
31 changed files with 176 additions and 204 deletions

View File

@ -148,8 +148,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private String liveNodeID; private String liveNodeID;
private Set<ConnectionLifeCycleListener> lifeCycleListeners;
// We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called. // We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called.
private boolean connectionReadyForWrites; private boolean connectionReadyForWrites;
@ -222,8 +220,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0); confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
lifeCycleListeners = new HashSet<>();
connectionReadyForWrites = true; connectionReadyForWrites = true;
} }
@ -238,14 +234,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
return newFailoverLock; return newFailoverLock;
} }
@Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
synchronized (connectionReadyLock) {
lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(), connectionReadyForWrites);
lifeCycleListeners.add(lifeCycleListener);
}
}
@Override @Override
public void connect(final int initialConnectAttempts, public void connect(final int initialConnectAttempts,
final boolean failoverOnInitialConnection) throws ActiveMQException { final boolean failoverOnInitialConnection) throws ActiveMQException {
@ -395,14 +383,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
@Override @Override
public void connectionReadyForWrites(final Object connectionID, final boolean ready) { public void connectionReadyForWrites(final Object connectionID, final boolean ready) {
synchronized (connectionReadyLock) {
if (connectionReadyForWrites != ready) {
connectionReadyForWrites = ready;
for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) {
lifeCycleListener.connectionReadyForWrites(connectionID, ready);
}
}
}
} }
@Override @Override

View File

@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
public interface ClientSessionFactoryInternal extends ClientSessionFactory { public interface ClientSessionFactoryInternal extends ClientSessionFactory {
@ -58,6 +57,4 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
ConfirmationWindowWarning getConfirmationWindowWarning(); ConfirmationWindowWarning getConfirmationWindowWarning();
Lock lockFailover(); Lock lockFailover();
void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
} }

View File

@ -44,8 +44,8 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning; import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
@ -408,6 +408,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return createConsumer(SimpleString.toSimpleString(queueName), null, browseOnly); return createConsumer(SimpleString.toSimpleString(queueName), null, browseOnly);
} }
@Override
public boolean isWritable(ReadyListener callback) {
return sessionContext.isWritable(callback);
}
/** /**
* Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on * Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on
* the remoting thread). * the remoting thread).
@ -695,11 +702,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return sessionFactory.getLiveNodeId(); return sessionFactory.getLiveNodeId();
} }
@Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
sessionFactory.addLifeCycleListener(lifeCycleListener);
}
// ClientSessionInternal implementation // ClientSessionInternal implementation
// ------------------------------------------------------------ // ------------------------------------------------------------

View File

@ -23,8 +23,8 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public interface ClientSessionInternal extends ClientSession { public interface ClientSessionInternal extends ClientSession {
@ -126,5 +126,5 @@ public interface ClientSessionInternal extends ClientSession {
String getNodeId(); String getNodeId();
void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener); boolean isWritable(ReadyListener callback);
} }

View File

@ -33,8 +33,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext; import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.ConcurrentHashSet;
/** /**
@ -101,8 +101,8 @@ public class DelegatingSession implements ClientSessionInternal {
} }
@Override @Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) { public boolean isWritable(ReadyListener callback) {
session.addLifeCycleListener(lifeCycleListener); return session.isWritable(callback);
} }
@Override @Override

View File

@ -97,6 +97,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl; import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
@ -238,6 +239,11 @@ public class ActiveMQSessionContext extends SessionContext {
return response.toQueueQuery(); return response.toQueueQuery();
} }
@Override
public boolean isWritable(ReadyListener callback) {
return remotingConnection.isWritable(callback);
}
@Override @Override
public ClientConsumerInternal createConsumer(SimpleString queueName, public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString, SimpleString filterString,

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -39,7 +39,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.IPV6Util; import org.apache.activemq.artemis.utils.IPV6Util;
public class NettyConnection implements Connection { public class NettyConnection implements Connection {
@ -65,10 +64,14 @@ public class NettyConnection implements Connection {
private final Semaphore writeLock = new Semaphore(1); private final Semaphore writeLock = new Semaphore(1);
private final Set<ReadyListener> readyListeners = new ConcurrentHashSet<>();
private RemotingConnection protocolConnection; private RemotingConnection protocolConnection;
private boolean ready = true;
/** if {@link #isWritable(ReadyListener)} returns false, we add a callback
* here for when the connection (or Netty Channel) becomes available again. */
private final ConcurrentLinkedDeque<ReadyListener> readyListeners = new ConcurrentLinkedDeque<>();
// Static -------------------------------------------------------- // Static --------------------------------------------------------
// Constructors -------------------------------------------------- // Constructors --------------------------------------------------
@ -96,6 +99,37 @@ public class NettyConnection implements Connection {
} }
// Connection implementation ---------------------------- // Connection implementation ----------------------------
public boolean isWritable(ReadyListener callback) {
synchronized (readyListeners) {
readyListeners.push(callback);
return ready;
}
}
public void fireReady(final boolean ready) {
synchronized (readyListeners) {
this.ready = ready;
if (ready) {
for (;;) {
ReadyListener readyListener = readyListeners.poll();
if (readyListener == null) {
return;
}
try {
readyListener.readyForWriting();
}
catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.warn(logOnly.getMessage(), logOnly);
}
}
}
}
}
@Override @Override
public void forceClose() { public void forceClose() {
if (channel != null) { if (channel != null) {
@ -323,28 +357,12 @@ public class NettyConnection implements Connection {
return directDeliver; return directDeliver;
} }
@Override
public void addReadyListener(final ReadyListener listener) {
readyListeners.add(listener);
}
@Override
public void removeReadyListener(final ReadyListener listener) {
readyListeners.remove(listener);
}
//never allow this //never allow this
@Override @Override
public ActiveMQPrincipal getDefaultActiveMQPrincipal() { public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null; return null;
} }
void fireReady(final boolean ready) {
for (ReadyListener listener : readyListeners) {
listener.readyForWriting(ready);
}
}
@Override @Override
public TransportConfiguration getConnectorConfig() { public TransportConfiguration getConnectorConfig() {
if (configuration != null) { if (configuration != null) {

View File

@ -936,6 +936,10 @@ public class NettyConnector extends AbstractConnector {
@Override @Override
public void connectionReadyForWrites(Object connectionID, boolean ready) { public void connectionReadyForWrites(Object connectionID, boolean ready) {
NettyConnection connection = (NettyConnection)connections.get(connectionID);
if (connection != null) {
connection.fireReady(ready);
}
listener.connectionReadyForWrites(connectionID, ready); listener.connectionReadyForWrites(connectionID, ready);
} }

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public abstract class AbstractRemotingConnection implements RemotingConnection { public abstract class AbstractRemotingConnection implements RemotingConnection {
@ -50,6 +51,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return new ArrayList<>(failureListeners); return new ArrayList<>(failureListeners);
} }
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
protected void callFailureListeners(final ActiveMQException me, String scaleDownTargetNodeID) { protected void callFailureListeners(final ActiveMQException me, String scaleDownTargetNodeID) {
final List<FailureListener> listenersClone = new ArrayList<>(failureListeners); final List<FailureListener> listenersClone = new ArrayList<>(failureListeners);

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
/** /**
* A RemotingConnection is a connection between a client and a server. * A RemotingConnection is a connection between a client and a server.
@ -181,4 +182,6 @@ public interface RemotingConnection extends BufferHandler {
*/ */
void flush(); void flush();
boolean isWritable(ReadyListener callback);
} }

View File

@ -39,6 +39,10 @@ public interface Connection {
void setProtocolConnection(RemotingConnection connection); void setProtocolConnection(RemotingConnection connection);
boolean isWritable(ReadyListener listener);
void fireReady(boolean ready);
/** /**
* returns the unique id of this wire. * returns the unique id of this wire.
* *
@ -104,10 +108,6 @@ public interface Connection {
*/ */
void checkFlushBatchBuffer(); void checkFlushBatchBuffer();
void addReadyListener(ReadyListener listener);
void removeReadyListener(ReadyListener listener);
/** /**
* Generates a {@link TransportConfiguration} to be used to connect to the same target this is * Generates a {@link TransportConfiguration} to be used to connect to the same target this is
* connected to. * connected to.

View File

@ -18,6 +18,6 @@ package org.apache.activemq.artemis.spi.core.remoting;
public interface ReadyListener { public interface ReadyListener {
void readyForWriting(boolean ready); void readyForWriting();
} }

View File

@ -269,4 +269,6 @@ public abstract class SessionContext {
public abstract void cleanup(); public abstract void cleanup();
public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits); public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
public abstract boolean isWritable(ReadyListener callback);
} }

View File

@ -114,7 +114,7 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
@Override @Override
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
return new ProtonSessionIntegrationCallback(this, manager, connection); return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection);
} }
} }

View File

@ -20,6 +20,8 @@ import java.util.concurrent.Executor;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@ -36,7 +38,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@ -58,16 +59,26 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
private final AMQPConnectionContext connection; private final AMQPConnectionContext connection;
private final Connection transportConnection;
private ServerSession serverSession; private ServerSession serverSession;
private AMQPSessionContext protonSession; private AMQPSessionContext protonSession;
public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI, public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI,
ProtonProtocolManager manager, ProtonProtocolManager manager,
AMQPConnectionContext connection) { AMQPConnectionContext connection,
Connection transportConnection) {
this.protonSPI = protonSPI; this.protonSPI = protonSPI;
this.manager = manager; this.manager = manager;
this.connection = connection; this.connection = connection;
this.transportConnection = transportConnection;
}
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
} }
@Override @Override
@ -305,16 +316,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
public void closed() { public void closed() {
} }
@Override
public void addReadyListener(ReadyListener listener) {
}
@Override
public void removeReadyListener(ReadyListener listener) {
}
@Override @Override
public void disconnect(ServerConsumer consumer, String queueName) { public void disconnect(ServerConsumer consumer, String queueName) {
synchronized (connection.getLock()) { synchronized (connection.getLock()) {

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public class MQTTConnection implements RemotingConnection { public class MQTTConnection implements RemotingConnection {
@ -52,6 +53,10 @@ public class MQTTConnection implements RemotingConnection {
this.destroyed = false; this.destroyed = false;
} }
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
@Override @Override
public Object getID() { public Object getID() {
return transportConnection.getID(); return transportConnection.getID();

View File

@ -59,7 +59,7 @@ public class MQTTSession {
mqttConnectionManager = new MQTTConnectionManager(this); mqttConnectionManager = new MQTTConnectionManager(this);
mqttPublishManager = new MQTTPublishManager(this); mqttPublishManager = new MQTTPublishManager(this);
sessionCallback = new MQTTSessionCallback(this); sessionCallback = new MQTTSessionCallback(this, connection);
subscriptionManager = new MQTTSubscriptionManager(this); subscriptionManager = new MQTTSubscriptionManager(this);
retainMessageManager = new MQTTRetainMessageManager(this); retainMessageManager = new MQTTRetainMessageManager(this);

View File

@ -25,12 +25,19 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public class MQTTSessionCallback implements SessionCallback { public class MQTTSessionCallback implements SessionCallback {
private MQTTSession session; private final MQTTSession session;
private final MQTTConnection connection;
private MQTTLogger log = MQTTLogger.LOGGER; private MQTTLogger log = MQTTLogger.LOGGER;
public MQTTSessionCallback(MQTTSession session) throws Exception { public MQTTSessionCallback(MQTTSession session, MQTTConnection connection) throws Exception {
this.session = session; this.session = session;
this.connection = connection;
}
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
} }
@Override @Override
@ -54,16 +61,6 @@ public class MQTTSessionCallback implements SessionCallback {
return 1; return 1;
} }
@Override
public void addReadyListener(ReadyListener listener) {
session.getConnection().getTransportConnection().addReadyListener(listener);
}
@Override
public void removeReadyListener(ReadyListener listener) {
session.getConnection().getTransportConnection().removeReadyListener(listener);
}
@Override @Override
public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
return sendMessage(message, consumer, deliveryCount); return sendMessage(message, consumer, deliveryCount);

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -160,6 +161,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
} }
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
// SecurityAuth implementation // SecurityAuth implementation
@Override @Override

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
@ -58,7 +59,6 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback { public class AMQSession implements SessionCallback {
@ -147,6 +147,11 @@ public class AMQSession implements SessionCallback {
started.set(true); started.set(true);
} }
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
}
@Override @Override
public void sendProducerCreditsMessage(int credits, SimpleString address) { public void sendProducerCreditsMessage(int credits, SimpleString address) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
@ -186,18 +191,6 @@ public class AMQSession implements SessionCallback {
} }
@Override
public void addReadyListener(ReadyListener listener) {
// TODO Auto-generated method stub
}
@Override
public void removeReadyListener(ReadyListener listener) {
// TODO Auto-generated method stub
}
@Override @Override
public boolean hasCredits(ServerConsumer consumerID) { public boolean hasCredits(ServerConsumer consumerID) {
AMQConsumer amqConsumer = consumers.get(consumerID.getID()); AMQConsumer amqConsumer = consumers.get(consumerID.getID());

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.activemq.artemis.utils.VersionLoader;
@ -118,6 +119,10 @@ public final class StompConnection implements RemotingConnection {
return frame; return frame;
} }
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
public boolean hasBytes() { public boolean hasBytes() {
return frameHandler.hasBytes(); return frameHandler.hasBytes();
} }

View File

@ -73,6 +73,11 @@ public class StompSession implements SessionCallback {
this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration()); this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration());
} }
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
}
void setServerSession(ServerSession session) { void setServerSession(ServerSession session) {
this.session = session; this.session = session;
} }
@ -181,16 +186,6 @@ public class StompSession implements SessionCallback {
public void closed() { public void closed() {
} }
@Override
public void addReadyListener(final ReadyListener listener) {
connection.getTransportConnection().addReadyListener(listener);
}
@Override
public void removeReadyListener(final ReadyListener listener) {
connection.getTransportConnection().removeReadyListener(listener);
}
@Override @Override
public void disconnect(ServerConsumer consumerId, String queueName) { public void disconnect(ServerConsumer consumerId, String queueName) {
StompSubscription stompSubscription = subscriptions.remove(consumerId.getID()); StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());

View File

@ -611,6 +611,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence()); newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
Connection oldTransportConnection = remotingConnection.getTransportConnection();
remotingConnection = newConnection; remotingConnection = newConnection;
remotingConnection.setCloseListeners(closeListeners); remotingConnection.setCloseListeners(closeListeners);
@ -624,6 +627,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
session.setTransferring(false); session.setTransferring(false);
// We do this because the old connection could be out of credits on netty
// this will force anything to resume after the reattach through the ReadyListener callbacks
oldTransportConnection.fireReady(true);
return serverLastReceivedCommandID; return serverLastReceivedCommandID;
} }
} }

View File

@ -148,7 +148,8 @@ public class ActiveMQPacketHandler implements ChannelHandler {
activeMQPrincipal = connection.getDefaultActiveMQPrincipal(); activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
} }
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel), null, true); ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(),
new CoreSessionCallback(request.getName(), protocolManager, channel, connection), null, true);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
channel.setHandler(handler); channel.setHandler(handler);

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@ -38,12 +39,20 @@ public final class CoreSessionCallback implements SessionCallback {
private ProtocolManager protocolManager; private ProtocolManager protocolManager;
private final RemotingConnection connection;
private String name; private String name;
public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel) { public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel, RemotingConnection connection) {
this.name = name; this.name = name;
this.protocolManager = protocolManager; this.protocolManager = protocolManager;
this.channel = channel; this.channel = channel;
this.connection = connection;
}
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
} }
@Override @Override
@ -101,16 +110,6 @@ public final class CoreSessionCallback implements SessionCallback {
protocolManager.removeHandler(name); protocolManager.removeHandler(name);
} }
@Override
public void addReadyListener(final ReadyListener listener) {
channel.getConnection().getTransportConnection().addReadyListener(listener);
}
@Override
public void removeReadyListener(final ReadyListener listener) {
channel.getConnection().getTransportConnection().removeReadyListener(listener);
}
@Override @Override
public void disconnect(ServerConsumer consumerId, String queueName) { public void disconnect(ServerConsumer consumerId, String queueName) {
if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) { if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {

View File

@ -101,6 +101,14 @@ public class InVMConnection implements Connection {
// no op // no op
} }
public boolean isWritable(ReadyListener listener) {
return true;
}
@Override
public void fireReady(boolean ready) {
}
@Override @Override
public RemotingConnection getProtocolConnection() { public RemotingConnection getProtocolConnection() {
return this.protocolConnection; return this.protocolConnection;
@ -230,14 +238,6 @@ public class InVMConnection implements Connection {
return -1; return -1;
} }
@Override
public void addReadyListener(ReadyListener listener) {
}
@Override
public void removeReadyListener(ReadyListener listener) {
}
@Override @Override
public boolean isUsingProtocolHandling() { public boolean isUsingProtocolHandling() {
return false; return false;

View File

@ -687,6 +687,8 @@ public class NettyAcceptor implements Acceptor {
if (conn != null) { if (conn != null) {
conn.fireReady(ready); conn.fireReady(ready);
} }
listener.connectionReadyForWrites(connectionID, ready);
} }
} }

View File

@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -58,8 +57,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.FutureLatch; import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TypedProperties; import org.apache.activemq.artemis.utils.TypedProperties;
@ -69,7 +67,7 @@ import org.apache.activemq.artemis.utils.UUID;
* A Core BridgeImpl * A Core BridgeImpl
*/ */
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ConnectionLifeCycleListener { public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener {
// Constants ----------------------------------------------------- // Constants -----------------------------------------------------
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled(); private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -135,8 +133,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private volatile ClientProducer producer; private volatile ClientProducer producer;
private volatile boolean connectionWritable = false;
private volatile boolean started; private volatile boolean started;
private volatile boolean stopping = false; private volatile boolean stopping = false;
@ -497,6 +493,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
} }
@Override
public void readyForWriting() {
queue.deliverAsync();
}
@Override @Override
public HandleStatus handle(final MessageReference ref) throws Exception { public HandleStatus handle(final MessageReference ref) throws Exception {
if (filter != null && !filter.match(ref.getMessage())) { if (filter != null && !filter.match(ref.getMessage())) {
@ -504,7 +505,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
synchronized (this) { synchronized (this) {
if (!active || !connectionWritable) { if (!active || !session.isWritable(this)) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) { if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref); ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref);
} }
@ -555,29 +556,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
} }
@Override
public void connectionCreated(ActiveMQComponent component, Connection connection, String protocol) {
}
@Override
public void connectionDestroyed(Object connectionID) {
}
@Override
public void connectionException(Object connectionID, ActiveMQException me) {
}
@Override
public void connectionReadyForWrites(Object connectionID, boolean ready) {
connectionWritable = ready;
if (connectionWritable) {
queue.deliverAsync();
}
}
// FailureListener implementation -------------------------------- // FailureListener implementation --------------------------------
@Override @Override
@ -891,8 +869,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
session.setSendAcknowledgementHandler(BridgeImpl.this); session.setSendAcknowledgementHandler(BridgeImpl.this);
session.addLifeCycleListener(BridgeImpl.this);
afterConnect(); afterConnect();
active = true; active = true;

View File

@ -21,7 +21,6 @@ import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
@ -129,12 +128,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private boolean transferring = false; private boolean transferring = false;
/* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
*/
private final AtomicBoolean writeReady = new AtomicBoolean(true);
private final long creationTime; private final long creationTime;
private AtomicLong consumerRateCheckTime = new AtomicLong(System.currentTimeMillis()); private AtomicLong consumerRateCheckTime = new AtomicLong(System.currentTimeMillis());
@ -198,8 +191,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount; this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
if (browseOnly) { if (browseOnly) {
@ -220,6 +211,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
} }
@Override
public void readyForWriting() {
promptDelivery();
}
// ServerConsumer implementation // ServerConsumer implementation
// ---------------------------------------------------------------------- // ----------------------------------------------------------------------
@ -289,7 +285,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// If the consumer is stopped then we don't accept the message, it // If the consumer is stopped then we don't accept the message, it
// should go back into the // should go back into the
// queue for delivery later. // queue for delivery later.
if (!started || transferring) { if (!started || transferring || !callback.isWritable(this)) {
return HandleStatus.BUSY; return HandleStatus.BUSY;
} }
@ -395,8 +391,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace")); ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
} }
callback.removeReadyListener(this);
setStarted(false); setStarted(false);
LargeMessageDeliverer del = largeMessageDeliverer; LargeMessageDeliverer del = largeMessageDeliverer;
@ -811,18 +805,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
} }
@Override
public void readyForWriting(final boolean ready) {
if (ready) {
writeReady.set(true);
promptDelivery();
}
else {
writeReady.set(false);
}
}
/** /**
* To be used on tests only * To be used on tests only
*/ */

View File

@ -43,9 +43,7 @@ public interface SessionCallback {
void closed(); void closed();
void addReadyListener(ReadyListener listener);
void removeReadyListener(ReadyListener listener);
void disconnect(ServerConsumer consumerId, String queueName); void disconnect(ServerConsumer consumerId, String queueName);
boolean isWritable(ReadyListener callback);
} }

View File

@ -483,6 +483,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
targetCallback.sendProducerCreditsMessage(credits, address); targetCallback.sendProducerCreditsMessage(credits, address);
} }
@Override
public boolean isWritable(ReadyListener callback) {
return true;
}
@Override @Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) { public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
targetCallback.sendProducerCreditsFailMessage(credits, address); targetCallback.sendProducerCreditsFailMessage(credits, address);
@ -538,22 +543,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
targetCallback.closed(); targetCallback.closed();
} }
/* (non-Javadoc)
* @see SessionCallback#addReadyListener(ReadyListener)
*/
@Override
public void addReadyListener(ReadyListener listener) {
targetCallback.addReadyListener(listener);
}
/* (non-Javadoc)
* @see SessionCallback#removeReadyListener(ReadyListener)
*/
@Override
public void removeReadyListener(ReadyListener listener) {
targetCallback.removeReadyListener(listener);
}
@Override @Override
public void disconnect(ServerConsumer consumerId, String queueName) { public void disconnect(ServerConsumer consumerId, String queueName) {
//To change body of implemented methods use File | Settings | File Templates. //To change body of implemented methods use File | Settings | File Templates.