ARTEMIS-1796 AMQP flush during handshake can break SASL

This commit is contained in:
Clebert Suconic 2018-04-09 10:43:29 -04:00
parent 6ca5f9eda2
commit 396056ce04
9 changed files with 48 additions and 1 deletions

View File

@ -78,6 +78,12 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private String clientID; private String clientID;
@Override
public void scheduledFlush() {
flush();
}
// Constructors // Constructors
// --------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------

View File

@ -52,6 +52,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
} }
@Override
public void scheduledFlush() {
flush();
}
@Override @Override
public List<FailureListener> getFailureListeners() { public List<FailureListener> getFailureListeners() {
return new ArrayList<>(failureListeners); return new ArrayList<>(failureListeners);

View File

@ -57,6 +57,8 @@ public interface RemotingConnection extends BufferHandler {
*/ */
String getRemoteAddress(); String getRemoteAddress();
void scheduledFlush();
/** /**
* add a failure listener. * add a failure listener.
* <p> * <p>

View File

@ -60,6 +60,12 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
return manager; return manager;
} }
@Override
public void scheduledFlush() {
amqpConnection.scheduledFlush();
}
/* /*
* This can be called concurrently by more than one thread so needs to be locked * This can be called concurrently by more than one thread so needs to be locked
*/ */

View File

@ -123,6 +123,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
} }
} }
public void scheduledFlush() {
handler.scheduledFlush();
}
public boolean isIncomingConnection() { public boolean isIncomingConnection() {
return isIncomingConnection; return isIncomingConnection;
} }

View File

@ -131,6 +131,17 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
} }
} }
/**
* We cannot flush until the initial handshake was finished.
* If this happens before the handshake, the connection response will happen without SASL
* and the client will respond and fail with an invalid code.
* */
public void scheduledFlush() {
if (receivedFirstPacket) {
flush();
}
}
public int capacity() { public int capacity() {
lock.lock(); lock.lock();
try { try {

View File

@ -58,6 +58,12 @@ public class MQTTConnection implements RemotingConnection {
this.destroyed = false; this.destroyed = false;
} }
@Override
public void scheduledFlush() {
flush();
}
@Override @Override
public boolean isWritable(ReadyListener callback) { public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback); return transportConnection.isWritable(callback);

View File

@ -116,6 +116,12 @@ public final class StompConnection implements RemotingConnection {
return frameHandler; return frameHandler;
} }
@Override
public void scheduledFlush() {
flush();
}
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException { public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
StompFrame frame = null; StompFrame frame = null;
try { try {

View File

@ -716,7 +716,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
// this is using a different thread // this is using a different thread
// as if anything wrong happens on flush // as if anything wrong happens on flush
// failure detection could be affected // failure detection could be affected
conn.flush(); conn.scheduledFlush();
} catch (Throwable e) { } catch (Throwable e) {
ActiveMQServerLogger.LOGGER.failedToFlushOutstandingDataFromTheConnection(e); ActiveMQServerLogger.LOGGER.failedToFlushOutstandingDataFromTheConnection(e);
} }