mirror of
https://github.com/apache/activemq-artemis.git
synced 2025-02-10 03:55:58 +00:00
ARTEMIS-1098 Improve flow control while streaming large messages
(cherry picked from commit c6d24e9073d0793350c24234f85f8d4532d250ff)
This commit is contained in:
parent
e82ef026b3
commit
3c33632882
@ -310,6 +310,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void broadcastGroupBindError(String hostAndPort);
|
||||
|
||||
@LogMessage(level = Logger.Level.WARN)
|
||||
@Message(id = 212057, value = "Large Message Streaming is taking too long to flush on back pressure.",
|
||||
format = Message.Format.MESSAGE_FORMAT)
|
||||
void timeoutStreamingLargeMessage();
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
|
||||
void onMessageError(@Cause Throwable e);
|
||||
|
@ -25,6 +25,7 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
@ -438,17 +439,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||
byte[] chunk,
|
||||
int reconnectID,
|
||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||
|
||||
if (requiresResponse) {
|
||||
// When sending it blocking, only the last chunk will be blocking.
|
||||
sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
|
||||
} else {
|
||||
sessionChannel.send(chunkPacket, reconnectID);
|
||||
}
|
||||
|
||||
return chunkPacket.getPacketSize();
|
||||
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -458,17 +449,7 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||
boolean lastChunk,
|
||||
byte[] chunk,
|
||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||
|
||||
if (requiresResponse) {
|
||||
// When sending it blocking, only the last chunk will be blocking.
|
||||
sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
|
||||
} else {
|
||||
sessionChannel.send(chunkPacket);
|
||||
}
|
||||
|
||||
return chunkPacket.getPacketSize();
|
||||
return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -770,6 +751,38 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||
}
|
||||
}
|
||||
|
||||
private static int sendSessionSendContinuationMessage(Channel channel,
|
||||
MessageInternal msgI,
|
||||
long messageBodySize,
|
||||
boolean sendBlocking,
|
||||
boolean lastChunk,
|
||||
byte[] chunk,
|
||||
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
|
||||
final boolean requiresResponse = lastChunk && sendBlocking;
|
||||
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
|
||||
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
|
||||
//perform a weak form of flow control to avoid OOM on tight loops
|
||||
final CoreRemotingConnection connection = channel.getConnection();
|
||||
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
|
||||
final long startFlowControl = System.nanoTime();
|
||||
final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
|
||||
if (!isWritable) {
|
||||
final long endFlowControl = System.nanoTime();
|
||||
final long elapsedFlowControl = endFlowControl - startFlowControl;
|
||||
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
|
||||
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
|
||||
logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
|
||||
}
|
||||
if (requiresResponse) {
|
||||
// When sending it blocking, only the last chunk will be blocking.
|
||||
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
|
||||
} else {
|
||||
channel.send(chunkPacket);
|
||||
}
|
||||
return chunkPacket.getPacketSize();
|
||||
}
|
||||
|
||||
|
||||
class ClientSessionPacketHandler implements ChannelHandler {
|
||||
|
||||
@Override
|
||||
|
Loading…
x
Reference in New Issue
Block a user