This commit is contained in:
Clebert Suconic 2017-04-10 14:00:05 -04:00
commit 359592cf5e
2 changed files with 40 additions and 22 deletions

View File

@ -310,6 +310,11 @@ public interface ActiveMQClientLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT) format = Message.Format.MESSAGE_FORMAT)
void broadcastGroupBindError(String hostAndPort); void broadcastGroupBindError(String hostAndPort);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 212057, value = "Large Message Streaming is taking too long to flush on back pressure.",
format = Message.Format.MESSAGE_FORMAT)
void timeoutStreamingLargeMessage();
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT) @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT)
void onMessageError(@Cause Throwable e); void onMessageError(@Cause Throwable e);

View File

@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -458,17 +459,7 @@ public class ActiveMQSessionContext extends SessionContext {
byte[] chunk, byte[] chunk,
int reconnectID, int reconnectID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException { SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking; return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
sessionChannel.sendBlocking(chunkPacket, reconnectID, PacketImpl.NULL_RESPONSE);
} else {
sessionChannel.send(chunkPacket, reconnectID);
}
return chunkPacket.getPacketSize();
} }
@Override @Override
@ -478,17 +469,7 @@ public class ActiveMQSessionContext extends SessionContext {
boolean lastChunk, boolean lastChunk,
byte[] chunk, byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException { SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking; return sendSessionSendContinuationMessage(this.sessionChannel, msgI, messageBodySize, sendBlocking, lastChunk, chunk, messageHandler);
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
} else {
sessionChannel.send(chunkPacket);
}
return chunkPacket.getPacketSize();
} }
@Override @Override
@ -813,6 +794,38 @@ public class ActiveMQSessionContext extends SessionContext {
} }
} }
private static int sendSessionSendContinuationMessage(Channel channel,
Message msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
final int expectedEncodeSize = chunkPacket.expectedEncodeSize();
//perform a weak form of flow control to avoid OOM on tight loops
final CoreRemotingConnection connection = channel.getConnection();
final long blockingCallTimeoutMillis = Math.max(0, connection.getBlockingCallTimeout());
final long startFlowControl = System.nanoTime();
final boolean isWritable = connection.blockUntilWritable(expectedEncodeSize, blockingCallTimeoutMillis);
if (!isWritable) {
final long endFlowControl = System.nanoTime();
final long elapsedFlowControl = endFlowControl - startFlowControl;
final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(elapsedFlowControl);
ActiveMQClientLogger.LOGGER.timeoutStreamingLargeMessage();
logger.debug("try to write " + expectedEncodeSize + " bytes after blocked " + elapsedMillis + " ms on a not writable connection: [" + connection.getID() + "]");
}
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
} else {
channel.send(chunkPacket);
}
return chunkPacket.getPacketSize();
}
class ClientSessionPacketHandler implements ChannelHandler { class ClientSessionPacketHandler implements ChannelHandler {
@Override @Override