diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java index ac73b57f25..f24fe87ecb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/RemotingConnectionImpl.java @@ -78,6 +78,12 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement private String clientID; + + @Override + public void scheduledFlush() { + flush(); + } + // Constructors // --------------------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java index a92a695af7..960cadde65 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/AbstractRemotingConnection.java @@ -52,6 +52,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection { this.creationTime = System.currentTimeMillis(); } + @Override + public void scheduledFlush() { + flush(); + } + @Override public List getFailureListeners() { return new ArrayList<>(failureListeners); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java index f2ff5d7de3..a906a3200d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/protocol/RemotingConnection.java @@ -57,6 +57,8 @@ public interface RemotingConnection extends BufferHandler { */ String getRemoteAddress(); + void scheduledFlush(); + /** * add a failure listener. *

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index 02f035a6ad..a37b7b7418 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -60,6 +60,12 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection return manager; } + + @Override + public void scheduledFlush() { + amqpConnection.scheduledFlush(); + } + /* * This can be called concurrently by more than one thread so needs to be locked */ diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index efb438dc3d..39f960994f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -123,6 +123,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH } } + + public void scheduledFlush() { + handler.scheduledFlush(); + } + public boolean isIncomingConnection() { return isIncomingConnection; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 5bee66ee9c..fb69208cba 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -131,6 +131,17 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener { } } + /** + * We cannot flush until the initial handshake was finished. + * If this happens before the handshake, the connection response will happen without SASL + * and the client will respond and fail with an invalid code. + * */ + public void scheduledFlush() { + if (receivedFirstPacket) { + flush(); + } + } + public int capacity() { lock.lock(); try { diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index 012356b553..26c136947d 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -58,6 +58,12 @@ public class MQTTConnection implements RemotingConnection { this.destroyed = false; } + + @Override + public void scheduledFlush() { + flush(); + } + @Override public boolean isWritable(ReadyListener callback) { return transportConnection.isWritable(callback); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 4415c952b4..baaa36e03b 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -116,6 +116,12 @@ public final class StompConnection implements RemotingConnection { return frameHandler; } + + @Override + public void scheduledFlush() { + flush(); + } + public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { StompFrame frame = null; try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 5af9657c3e..17351d9af8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -716,7 +716,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif // this is using a different thread // as if anything wrong happens on flush // failure detection could be affected - conn.flush(); + conn.scheduledFlush(); } catch (Throwable e) { ActiveMQServerLogger.LOGGER.failedToFlushOutstandingDataFromTheConnection(e); }