diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index 99e593f096..b0998b8a6c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -371,10 +371,10 @@ public class ClientProducerImpl implements ClientProducerInternal { context.open(); try { - for (int pos = 0; pos < bodySize; ) { + for (long pos = 0; pos < bodySize; ) { final boolean lastChunk; - final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize); + final int chunkLength = (int) Math.min((bodySize - pos), (long) minLargeMessageSize); final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); @@ -385,7 +385,7 @@ public class ClientProducerImpl implements ClientProducerInternal { lastChunk = pos >= bodySize; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; - int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), reconnectID, messageHandler); + int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler); credits.acquireCredits(creditsUsed); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 9f0edceca5..f49a22a9d9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -437,6 +437,22 @@ public class ActiveMQSessionContext extends SessionContext { return chunkPacket.getPacketSize(); } + @Override + public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException { + final boolean requiresResponse = lastChunk && sendBlocking; + final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler); + + if (requiresResponse) { + // When sending it blocking, only the last chunk will be blocking. + sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE); + } + else { + sessionChannel.send(chunkPacket); + } + + return chunkPacket.getPacketSize(); + } + @Override public void sendACK(boolean individual, boolean block, diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 774dbfe2df..175360cd0d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -150,6 +150,13 @@ public abstract class SessionContext { int reconnectID, SendAcknowledgementHandler messageHandler) throws ActiveMQException; + public abstract int sendServerLargeMessageChunk(MessageInternal msgI, + long messageBodySize, + boolean sendBlocking, + boolean lastChunk, + byte[] chunk, + SendAcknowledgementHandler messageHandler) throws ActiveMQException; + public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); public abstract void createSharedQueue(SimpleString address,