This commit is contained in:
Clebert Suconic 2018-04-09 13:11:41 -04:00
commit e0334dff0e
9 changed files with 48 additions and 1 deletions

View File

@ -78,6 +78,12 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
private String clientID;
@Override
public void scheduledFlush() {
flush();
}
// Constructors
// ---------------------------------------------------------------------------------

View File

@ -52,6 +52,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
this.creationTime = System.currentTimeMillis();
}
@Override
public void scheduledFlush() {
flush();
}
@Override
public List<FailureListener> getFailureListeners() {
return new ArrayList<>(failureListeners);

View File

@ -57,6 +57,8 @@ public interface RemotingConnection extends BufferHandler {
*/
String getRemoteAddress();
void scheduledFlush();
/**
* add a failure listener.
* <p>

View File

@ -60,6 +60,12 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
return manager;
}
@Override
public void scheduledFlush() {
amqpConnection.scheduledFlush();
}
/*
* This can be called concurrently by more than one thread so needs to be locked
*/

View File

@ -123,6 +123,11 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
public void scheduledFlush() {
handler.scheduledFlush();
}
public boolean isIncomingConnection() {
return isIncomingConnection;
}

View File

@ -131,6 +131,17 @@ public class ProtonHandler extends ProtonInitializable implements SaslListener {
}
}
/**
* We cannot flush until the initial handshake was finished.
* If this happens before the handshake, the connection response will happen without SASL
* and the client will respond and fail with an invalid code.
* */
public void scheduledFlush() {
if (receivedFirstPacket) {
flush();
}
}
public int capacity() {
lock.lock();
try {

View File

@ -58,6 +58,12 @@ public class MQTTConnection implements RemotingConnection {
this.destroyed = false;
}
@Override
public void scheduledFlush() {
flush();
}
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);

View File

@ -116,6 +116,12 @@ public final class StompConnection implements RemotingConnection {
return frameHandler;
}
@Override
public void scheduledFlush() {
flush();
}
public StompFrame decode(ActiveMQBuffer buffer) throws ActiveMQStompException {
StompFrame frame = null;
try {

View File

@ -716,7 +716,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif
// this is using a different thread
// as if anything wrong happens on flush
// failure detection could be affected
conn.flush();
conn.scheduledFlush();
} catch (Throwable e) {
ActiveMQServerLogger.LOGGER.failedToFlushOutstandingDataFromTheConnection(e);
}