Fix ByteBuffer regression from Netty upgrade
Using array() is a bit dangerous as it's an optional part of any ByteBuffer implementation. This new method will deal with various ByteBuffer implementations appropriately.
This commit is contained in:
parent
f4e37c360f
commit
577620533d
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.utils;
|
package org.apache.activemq.artemis.utils;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.UnpooledByteBufAllocator;
|
import io.netty.buffer.UnpooledByteBufAllocator;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
@ -90,4 +92,16 @@ public class ByteUtil {
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static byte[] getActiveArray(ByteBuffer buffer) {
|
||||||
|
byte[] ret = new byte[buffer.remaining()];
|
||||||
|
if (buffer.hasArray()) {
|
||||||
|
byte[] array = buffer.array();
|
||||||
|
System.arraycopy(array, buffer.arrayOffset() + buffer.position(), ret, 0, ret.length);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
buffer.slice().get(ret);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||||
|
import org.apache.activemq.artemis.utils.ByteUtil;
|
||||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||||
import org.apache.activemq.artemis.utils.PriorityLinkedList;
|
import org.apache.activemq.artemis.utils.PriorityLinkedList;
|
||||||
import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
|
import org.apache.activemq.artemis.utils.PriorityLinkedListImpl;
|
||||||
|
@ -640,7 +641,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||||
//sets the packet
|
//sets the packet
|
||||||
ActiveMQBuffer qbuff = clMessage.getBodyBuffer();
|
ActiveMQBuffer qbuff = clMessage.getBodyBuffer();
|
||||||
int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
|
int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
|
||||||
final byte[] body = qbuff.readBytes(bytesToRead).toByteBuffer().array();
|
final byte[] body = ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer());
|
||||||
|
|
||||||
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
|
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
|
||||||
currentLargeMessageController.addPacket(body, body.length, false);
|
currentLargeMessageController.addPacket(body, body.length, false);
|
||||||
|
|
Loading…
Reference in New Issue