This closes #605

This commit is contained in:
Clebert Suconic 2016-06-27 18:23:45 -04:00
commit 8d728ee706
3 changed files with 26 additions and 3 deletions

View File

@ -371,10 +371,10 @@ public class ClientProducerImpl implements ClientProducerInternal {
context.open(); context.open();
try { try {
for (int pos = 0; pos < bodySize; ) { for (long pos = 0; pos < bodySize; ) {
final boolean lastChunk; final boolean lastChunk;
final int chunkLength = Math.min((int) (bodySize - pos), minLargeMessageSize); final int chunkLength = (int) Math.min((bodySize - pos), (long) minLargeMessageSize);
final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength); final ActiveMQBuffer bodyBuffer = ActiveMQBuffers.fixedBuffer(chunkLength);
@ -385,7 +385,7 @@ public class ClientProducerImpl implements ClientProducerInternal {
lastChunk = pos >= bodySize; lastChunk = pos >= bodySize;
SendAcknowledgementHandler messageHandler = lastChunk ? handler : null; SendAcknowledgementHandler messageHandler = lastChunk ? handler : null;
int creditsUsed = sessionContext.sendLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), reconnectID, messageHandler); int creditsUsed = sessionContext.sendServerLargeMessageChunk(msgI, -1, sendBlocking, lastChunk, bodyBuffer.toByteBuffer().array(), messageHandler);
credits.acquireCredits(creditsUsed); credits.acquireCredits(creditsUsed);
} }

View File

@ -437,6 +437,22 @@ public class ActiveMQSessionContext extends SessionContext {
return chunkPacket.getPacketSize(); return chunkPacket.getPacketSize();
} }
@Override
public int sendServerLargeMessageChunk(MessageInternal msgI, long messageBodySize, boolean sendBlocking, boolean lastChunk, byte[] chunk, SendAcknowledgementHandler messageHandler) throws ActiveMQException {
final boolean requiresResponse = lastChunk && sendBlocking;
final SessionSendContinuationMessage chunkPacket = new SessionSendContinuationMessage(msgI, chunk, !lastChunk, requiresResponse, messageBodySize, messageHandler);
if (requiresResponse) {
// When sending it blocking, only the last chunk will be blocking.
sessionChannel.sendBlocking(chunkPacket, PacketImpl.NULL_RESPONSE);
}
else {
sessionChannel.send(chunkPacket);
}
return chunkPacket.getPacketSize();
}
@Override @Override
public void sendACK(boolean individual, public void sendACK(boolean individual,
boolean block, boolean block,

View File

@ -150,6 +150,13 @@ public abstract class SessionContext {
int reconnectID, int reconnectID,
SendAcknowledgementHandler messageHandler) throws ActiveMQException; SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract int sendServerLargeMessageChunk(MessageInternal msgI,
long messageBodySize,
boolean sendBlocking,
boolean lastChunk,
byte[] chunk,
SendAcknowledgementHandler messageHandler) throws ActiveMQException;
public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler); public abstract void setSendAcknowledgementHandler(final SendAcknowledgementHandler handler);
public abstract void createSharedQueue(SimpleString address, public abstract void createSharedQueue(SimpleString address,