ARTEMIS-1098 Improve flow control while streaming large messages
This commit is contained in:
parent
73c79de8af
commit
da6b851c60
|
@ -310,6 +310,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
|
|||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void broadcastGroupBindError(String hostAndPort);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 212057, value = "Large Message Streaming is taking too long to flush on back pressure.",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void timeoutStreamingLargeMessage();
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
|
||||
void onMessageError(@Cause Throwable e);
|
||||
|
|
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
|
@ -458,17 +459,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
byte[] chunk,
|
||||
int reconnectID,
|
||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||
|
||||
if (requiresResponse) {
|
||||
// When sending it blocking, only the last chunk will be blocking.
|
||||
sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
|
||||
} else {
|
||||
sessionChannel.send(chunkPacket, reconnectID);
|
||||
}
|
||||
|
||||
return chunkPacket.getPacketSize();
|
||||
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -478,17 +469,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
boolean lastChunk,
|
||||
byte[] chunk,
|
||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||
|
||||
if (requiresResponse) {
|
||||
// When sending it blocking, only the last chunk will be blocking.
|
||||
sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
|
||||
} else {
|
||||
sessionChannel.send(chunkPacket);
|
||||
}
|
||||
|
||||
return chunkPacket.getPacketSize();
|
||||
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -813,6 +794,38 @@ public class ActiveMQSessionContext extends SessionContext {
|
|||
}
|
||||
}
|
||||
|
||||
private static int sendSessionSendContinuationMessage(Channel channel,
|
||||
Message msgI,
|
||||
long messageBodySize,
|
||||
boolean sendBlocking,
|
||||
boolean lastChunk,
|
||||
byte[] chunk,
|
||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
|
||||
//perform a weak form of flow control to avoid OOM on tight loops
|
||||
final CoreRemotingConnection connection = channel.getConnection();
|
||||
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
|
||||
final long startFlowControl = System.nanoTime();
|
||||
final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
|
||||
if (!isWritable) {
|
||||
final long endFlowControl = System.nanoTime();
|
||||
final long elapsedFlowControl = endFlowControl - startFlowControl;
|
||||
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
|
||||
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
|
||||
logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
|
||||
}
|
||||
if (requiresResponse) {
|
||||
// When sending it blocking, only the last chunk will be blocking.
|
||||
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
|
||||
} else {
|
||||
channel.send(chunkPacket);
|
||||
}
|
||||
return chunkPacket.getPacketSize();
|
||||
}
|
||||
|
||||
|
||||
class ClientSessionPacketHandler implements ChannelHandler {
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue