This closes #3154
This commit is contained in:
commit
e000bd44e8
|
@ -799,19 +799,6 @@ public interface ActiveMQBuffer extends DataInput {
|
||||||
@Override
|
@Override
|
||||||
String readUTF();
|
String readUTF();
|
||||||
|
|
||||||
/**
|
|
||||||
* Transfers this buffer's data to a newly created buffer starting at
|
|
||||||
* the current {@code readerIndex} and increases the {@code readerIndex}
|
|
||||||
* by the number of the transferred bytes (= {@code length}).
|
|
||||||
* The returned buffer's {@code readerIndex} and {@code writerIndex} are
|
|
||||||
* {@code 0} and {@code length} respectively.
|
|
||||||
*
|
|
||||||
* @param length the number of bytes to transfer
|
|
||||||
* @return the newly created buffer which contains the transferred bytes
|
|
||||||
* @throws IndexOutOfBoundsException if {@code length} is greater than {@code this.readableBytes}
|
|
||||||
*/
|
|
||||||
ActiveMQBuffer readBytes(int length);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a new slice of this buffer's sub-region starting at the current
|
* Returns a new slice of this buffer's sub-region starting at the current
|
||||||
* {@code readerIndex} and increases the {@code readerIndex} by the size
|
* {@code readerIndex} and increases the {@code readerIndex} by the size
|
||||||
|
|
|
@ -330,11 +330,6 @@ public class ChannelBufferWrapper implements ActiveMQBuffer {
|
||||||
buffer.readBytes(dst.byteBuf());
|
buffer.readBytes(dst.byteBuf());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ActiveMQBuffer readBytes(final int length) {
|
|
||||||
return new ChannelBufferWrapper(buffer.readBytes(length), releasable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public char readChar() {
|
public char readChar() {
|
||||||
return (char) buffer.readShort();
|
return (char) buffer.readShort();
|
||||||
|
|
|
@ -239,17 +239,6 @@ public class ByteUtil {
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static byte[] getActiveArray(ByteBuffer buffer) {
|
|
||||||
byte[] ret = new byte[buffer.remaining()];
|
|
||||||
if (buffer.hasArray()) {
|
|
||||||
byte[] array = buffer.array();
|
|
||||||
System.arraycopy(array, buffer.arrayOffset() + buffer.position(), ret, 0, ret.length);
|
|
||||||
} else {
|
|
||||||
buffer.slice().get(ret);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static long convertTextBytes(final String text) {
|
public static long convertTextBytes(final String text) {
|
||||||
try {
|
try {
|
||||||
Matcher m = ONE.matcher(text);
|
Matcher m = ONE.matcher(text);
|
||||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
|
||||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.FutureLatch;
|
import org.apache.activemq.artemis.utils.FutureLatch;
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch;
|
import org.apache.activemq.artemis.utils.ReusableLatch;
|
||||||
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
|
import org.apache.activemq.artemis.utils.TokenBucketLimiter;
|
||||||
|
@ -655,9 +654,9 @@ public final class ClientConsumerImpl implements ClientConsumerInternal {
|
||||||
|
|
||||||
//sets the packet
|
//sets the packet
|
||||||
ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer();
|
ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer();
|
||||||
int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
|
final int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex();
|
||||||
final byte[] body = ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer());
|
final byte[] body = new byte[bytesToRead];
|
||||||
|
qbuff.readBytes(body);
|
||||||
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
|
largeMessage.setLargeMessageController(new CompressedLargeMessageControllerImpl(currentLargeMessageController));
|
||||||
currentLargeMessageController.addPacket(body, body.length, false);
|
currentLargeMessageController.addPacket(body, body.length, false);
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
|
@ -592,13 +591,6 @@ final class CompressedLargeMessageControllerImpl implements LargeMessageControll
|
||||||
return Float.intBitsToFloat(getInt(index));
|
return Float.intBitsToFloat(getInt(index));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ActiveMQBuffer readBytes(final int length) {
|
|
||||||
byte[] bytesToGet = new byte[length];
|
|
||||||
readBytes(bytesToGet);
|
|
||||||
return ActiveMQBuffers.wrappedBuffer(bytesToGet);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double readDouble() {
|
public double readDouble() {
|
||||||
return Double.longBitsToDouble(readLong());
|
return Double.longBitsToDouble(readLong());
|
||||||
|
|
|
@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
|
||||||
|
@ -975,14 +974,6 @@ public class LargeMessageControllerImpl implements LargeMessageController {
|
||||||
return Float.intBitsToFloat(getInt(index));
|
return Float.intBitsToFloat(getInt(index));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ActiveMQBuffer readBytes(final int length) {
|
|
||||||
byte[] bytesToGet = new byte[length];
|
|
||||||
getBytes(readerIndex, bytesToGet);
|
|
||||||
readerIndex += length;
|
|
||||||
return ActiveMQBuffers.wrappedBuffer(bytesToGet);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double readDouble() {
|
public double readDouble() {
|
||||||
return Double.longBitsToDouble(readLong());
|
return Double.longBitsToDouble(readLong());
|
||||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
|
||||||
import org.apache.activemq.artemis.core.persistence.Persister;
|
import org.apache.activemq.artemis.core.persistence.Persister;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
import org.apache.activemq.artemis.reader.MessageUtil;
|
import org.apache.activemq.artemis.reader.MessageUtil;
|
||||||
import org.apache.activemq.artemis.utils.ByteUtil;
|
|
||||||
import org.apache.activemq.artemis.utils.DataConstants;
|
import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
import org.apache.activemq.artemis.utils.UUID;
|
import org.apache.activemq.artemis.utils.UUID;
|
||||||
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
import org.apache.activemq.artemis.utils.collections.TypedProperties;
|
||||||
|
@ -251,9 +250,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
|
private ActiveMQBuffer inflate(ActiveMQBuffer buffer) throws DataFormatException {
|
||||||
int bytesToRead = buffer.readableBytes();
|
final int bytesToRead = buffer.readableBytes();
|
||||||
Inflater inflater = new Inflater();
|
Inflater inflater = new Inflater();
|
||||||
inflater.setInput(ByteUtil.getActiveArray(buffer.readBytes(bytesToRead).toByteBuffer()));
|
final byte[] input = new byte[bytesToRead];
|
||||||
|
buffer.readBytes(input);
|
||||||
|
inflater.setInput(input);
|
||||||
|
|
||||||
//get the real size of large message
|
//get the real size of large message
|
||||||
long sizeBody = getLongProperty(Message.HDR_LARGE_BODY_SIZE);
|
long sizeBody = getLongProperty(Message.HDR_LARGE_BODY_SIZE);
|
||||||
|
|
Loading…
Reference in New Issue