diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 0fe4a5a03d..748e508bc2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -310,6 +310,11 @@ public interface ActiveMQClientLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void broadcastGroupBindError(String hostAndPort); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 212057, value = "Large Message Streaming is taking too long to flush on back pressure.", + format = Message.Format.MESSAGE_FORMAT) + void timeoutStreamingLargeMessage(); + @LogMessage(level = Logger.Level.ERROR) @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT) void onMessageError(@Cause Throwable e); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 6f92330285..7799395f62 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -458,17 +459,7 @@ public class ActiveMQSessionContext extends SessionContext { byte[] chunk, int reconnectID, SendAcknowledgementHandler messageHandler) throws ActiveMQException { - final boolean requiresResponse = lastChunk && sendBlocking; - final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); - - if (requiresResponse) { - // When sending it blocking, only the last chunk will be blocking. - sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE); - } else { - sessionChannel.send(chunkPacket, reconnectID); - } - - return chunkPacket.getPacketSize(); + return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler); } @Override @@ -478,17 +469,7 @@ public class ActiveMQSessionContext extends SessionContext { boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { - final boolean requiresResponse = lastChunk && sendBlocking; - final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); - - if (requiresResponse) { - // When sending it blocking, only the last chunk will be blocking. - sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); - } else { - sessionChannel.send(chunkPacket); - } - - return chunkPacket.getPacketSize(); + return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler); } @Override @@ -813,6 +794,38 @@ public class ActiveMQSessionContext extends SessionContext { } } + private static int sendSessionSendContinuationMessage(Channel channel, + Message msgI, + long messageBodySize, + boolean sendBlocking, + boolean lastChunk, + byte[] chunk, + SendAcknowledgementHandler messageHandler) throws ActiveMQException { + final boolean requiresResponse = lastChunk && sendBlocking; + final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + final int expectedEncodeSize = chunkPacket.expectedEncodeSize(); + //perform a weak form of flow control to avoid OOM on tight loops + final CoreRemotingConnection connection = channel.getConnection(); + final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout()); + final long startFlowControl = System.nanoTime(); + final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis); + if (!isWritable) { + final long endFlowControl = System.nanoTime(); + final long elapsedFlowControl = endFlowControl - startFlowControl; + final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl); + ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage(); + logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]"); + } + if (requiresResponse) { + // When sending it blocking, only the last chunk will be blocking. + channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } else { + channel.send(chunkPacket); + } + return chunkPacket.getPacketSize(); + } + + class ClientSessionPacketHandler implements ChannelHandler { @Override