Factor out ChannelBuffer from BytesReference (#19129)
The ChannelBuffer interface today leaks into the BytesReference abstraction which causes a hard dependency on Netty across the board. This chance moves this dependency and all BytesReference -> ChannelBuffer conversion into NettyUtlis and removes the abstraction leak on BytesReference. This change also removes unused methods on the BytesReference interface and simplifies access to internal pages.
This commit is contained in:
parent
6b7acc0ca2
commit
872cdffc27
|
@ -20,14 +20,10 @@
|
|||
package org.elasticsearch.common.bytes;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.io.Channels;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.channels.GatheringByteChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
|
||||
|
@ -103,11 +99,6 @@ public class BytesArray implements BytesReference {
|
|||
os.write(bytes, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(GatheringByteChannel channel) throws IOException {
|
||||
Channels.writeToChannel(bytes, offset, length(), channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes() {
|
||||
if (offset == 0 && bytes.length == length) {
|
||||
|
@ -126,11 +117,6 @@ public class BytesArray implements BytesReference {
|
|||
return new BytesArray(Arrays.copyOfRange(bytes, offset, offset + length));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer toChannelBuffer() {
|
||||
return ChannelBuffers.wrappedBuffer(bytes, offset, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasArray() {
|
||||
return true;
|
||||
|
|
|
@ -19,19 +19,18 @@
|
|||
package org.elasticsearch.common.bytes;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.channels.GatheringByteChannel;
|
||||
|
||||
/**
|
||||
* A reference to bytes.
|
||||
*/
|
||||
public interface BytesReference {
|
||||
|
||||
public static class Helper {
|
||||
class Helper {
|
||||
|
||||
public static boolean bytesEqual(BytesReference a, BytesReference b) {
|
||||
if (a == b) {
|
||||
|
@ -108,10 +107,6 @@ public interface BytesReference {
|
|||
*/
|
||||
void writeTo(OutputStream os) throws IOException;
|
||||
|
||||
/**
|
||||
* Writes the bytes directly to the channel.
|
||||
*/
|
||||
void writeTo(GatheringByteChannel channel) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the bytes as a single byte array.
|
||||
|
@ -128,11 +123,6 @@ public interface BytesReference {
|
|||
*/
|
||||
BytesArray copyBytesArray();
|
||||
|
||||
/**
|
||||
* Returns the bytes as a channel buffer.
|
||||
*/
|
||||
ChannelBuffer toChannelBuffer();
|
||||
|
||||
/**
|
||||
* Is there an underlying byte array for this bytes reference.
|
||||
*/
|
||||
|
@ -162,4 +152,22 @@ public interface BytesReference {
|
|||
* Converts to a copied Lucene BytesRef.
|
||||
*/
|
||||
BytesRef copyBytesRef();
|
||||
|
||||
/**
|
||||
* Returns a BytesRefIterator for this BytesReference. This method allows
|
||||
* access to the internal pages of this reference without copying them. Use with care!
|
||||
* @see BytesRefIterator
|
||||
*/
|
||||
default BytesRefIterator iterator() {
|
||||
return new BytesRefIterator() {
|
||||
BytesRef ref = toBytesRef();
|
||||
@Override
|
||||
public BytesRef next() throws IOException {
|
||||
BytesRef r = ref;
|
||||
ref = null; // only return it once...
|
||||
return r;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,19 +20,15 @@
|
|||
package org.elasticsearch.common.bytes;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.apache.lucene.util.CharsRefBuilder;
|
||||
import org.elasticsearch.common.io.Channels;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.ByteArray;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.channels.GatheringByteChannel;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
|
@ -113,30 +109,6 @@ public class PagedBytesReference implements BytesReference {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(GatheringByteChannel channel) throws IOException {
|
||||
// nothing to do
|
||||
if (length == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int currentLength = length;
|
||||
int currentOffset = offset;
|
||||
BytesRef ref = new BytesRef();
|
||||
|
||||
while (currentLength > 0) {
|
||||
// try to align to the underlying pages while writing, so no new arrays will be created.
|
||||
int fragmentSize = Math.min(currentLength, PAGE_SIZE - (currentOffset % PAGE_SIZE));
|
||||
boolean newArray = bytearray.get(currentOffset, fragmentSize, ref);
|
||||
assert !newArray : "PagedBytesReference failed to align with underlying bytearray. offset [" + currentOffset + "], size [" + fragmentSize + "]";
|
||||
Channels.writeToChannel(ref.bytes, ref.offset, ref.length, channel);
|
||||
currentLength -= ref.length;
|
||||
currentOffset += ref.length;
|
||||
}
|
||||
|
||||
assert currentLength == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes() {
|
||||
if (length == 0) {
|
||||
|
@ -178,60 +150,6 @@ public class PagedBytesReference implements BytesReference {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer toChannelBuffer() {
|
||||
// nothing to do
|
||||
if (length == 0) {
|
||||
return ChannelBuffers.EMPTY_BUFFER;
|
||||
}
|
||||
|
||||
ChannelBuffer[] buffers;
|
||||
ChannelBuffer currentBuffer = null;
|
||||
BytesRef ref = new BytesRef();
|
||||
int pos = 0;
|
||||
|
||||
// are we a slice?
|
||||
if (offset != 0) {
|
||||
// remaining size of page fragment at offset
|
||||
int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
|
||||
bytearray.get(offset, fragmentSize, ref);
|
||||
currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, fragmentSize);
|
||||
pos += fragmentSize;
|
||||
}
|
||||
|
||||
// no need to create a composite buffer for a single page
|
||||
if (pos == length && currentBuffer != null) {
|
||||
return currentBuffer;
|
||||
}
|
||||
|
||||
// a slice > pagesize will likely require extra buffers for initial/trailing fragments
|
||||
int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);
|
||||
|
||||
buffers = new ChannelBuffer[numBuffers];
|
||||
int bufferSlot = 0;
|
||||
|
||||
if (currentBuffer != null) {
|
||||
buffers[bufferSlot] = currentBuffer;
|
||||
bufferSlot++;
|
||||
}
|
||||
|
||||
// handle remainder of pages + trailing fragment
|
||||
while (pos < length) {
|
||||
int remaining = length - pos;
|
||||
int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
|
||||
bytearray.get(offset + pos, bulkSize, ref);
|
||||
currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, bulkSize);
|
||||
buffers[bufferSlot] = currentBuffer;
|
||||
bufferSlot++;
|
||||
pos += bulkSize;
|
||||
}
|
||||
|
||||
// this would indicate that our numBuffer calculation is off by one.
|
||||
assert (numBuffers == bufferSlot);
|
||||
|
||||
return ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, buffers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasArray() {
|
||||
return (offset + length <= PAGE_SIZE);
|
||||
|
@ -338,17 +256,6 @@ public class PagedBytesReference implements BytesReference {
|
|||
return true;
|
||||
}
|
||||
|
||||
private int countRequiredBuffers(int initialCount, int numBytes) {
|
||||
int numBuffers = initialCount;
|
||||
// an "estimate" of how many pages remain - rounded down
|
||||
int pages = numBytes / PAGE_SIZE;
|
||||
// a remaining fragment < pagesize needs at least one buffer
|
||||
numBuffers += (pages == 0) ? 1 : pages;
|
||||
// a remainder that is not a multiple of pagesize also needs an extra buffer
|
||||
numBuffers += (pages > 0 && numBytes % PAGE_SIZE > 0) ? 1 : 0;
|
||||
return numBuffers;
|
||||
}
|
||||
|
||||
private static class PagedBytesReferenceStreamInput extends StreamInput {
|
||||
|
||||
private final ByteArray bytearray;
|
||||
|
@ -451,4 +358,36 @@ public class PagedBytesReference implements BytesReference {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public final BytesRefIterator iterator() {
|
||||
final int offset = this.offset;
|
||||
final int length = this.length;
|
||||
// this iteration is page aligned to ensure we do NOT materialize the pages from the ByteArray
|
||||
// we calculate the initial fragment size here to ensure that if this reference is a slice we are still page aligned
|
||||
// across the entire iteration. The first page is smaller if our offset != 0 then we start in the middle of the page
|
||||
// otherwise we iterate full pages until we reach the last chunk which also might end within a page.
|
||||
final int initialFragmentSize = offset != 0 ? PAGE_SIZE - (offset % PAGE_SIZE) : PAGE_SIZE;
|
||||
return new BytesRefIterator() {
|
||||
int position = 0;
|
||||
int nextFragmentSize = Math.min(length, initialFragmentSize);
|
||||
// this BytesRef is reused across the iteration on purpose - BytesRefIterator interface was designed for this
|
||||
final BytesRef slice = new BytesRef();
|
||||
|
||||
@Override
|
||||
public BytesRef next() throws IOException {
|
||||
if (nextFragmentSize != 0) {
|
||||
final boolean materialized = bytearray.get(offset + position, nextFragmentSize, slice);
|
||||
assert materialized == false : "iteration should be page aligned but array got materialized";
|
||||
position += nextFragmentSize;
|
||||
final int remaining = length - position;
|
||||
nextFragmentSize = Math.min(remaining, PAGE_SIZE);
|
||||
return slice;
|
||||
} else {
|
||||
assert nextFragmentSize == 0 : "fragmentSize expected [0] but was: [" + nextFragmentSize + "]";
|
||||
return null; // we are done with this iteration
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.elasticsearch.common.util.ByteArray;
|
|||
* An extension to {@link PagedBytesReference} that requires releasing its content. This
|
||||
* class exists to make it explicit when a bytes reference needs to be released, and when not.
|
||||
*/
|
||||
public class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {
|
||||
public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {
|
||||
|
||||
public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) {
|
||||
super(bigarrays, bytearray, length);
|
||||
|
|
|
@ -16,26 +16,26 @@
|
|||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.bytes;
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.io.Channels;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.transport.netty.ChannelBufferStreamInputFactory;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.channels.GatheringByteChannel;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ChannelBufferBytesReference implements BytesReference {
|
||||
final class ChannelBufferBytesReference implements BytesReference {
|
||||
|
||||
private final ChannelBuffer buffer;
|
||||
|
||||
public ChannelBufferBytesReference(ChannelBuffer buffer) {
|
||||
ChannelBufferBytesReference(ChannelBuffer buffer) {
|
||||
this.buffer = buffer;
|
||||
}
|
||||
|
||||
|
@ -64,11 +64,6 @@ public class ChannelBufferBytesReference implements BytesReference {
|
|||
buffer.getBytes(buffer.readerIndex(), os, length());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(GatheringByteChannel channel) throws IOException {
|
||||
Channels.writeToChannel(buffer, buffer.readerIndex(), length(), channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes() {
|
||||
return copyBytesArray().toBytes();
|
||||
|
@ -89,7 +84,6 @@ public class ChannelBufferBytesReference implements BytesReference {
|
|||
return new BytesArray(copy);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer toChannelBuffer() {
|
||||
return buffer.duplicate();
|
||||
}
|
|
@ -18,12 +18,20 @@
|
|||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.logging.InternalLogger;
|
||||
import org.jboss.netty.logging.InternalLoggerFactory;
|
||||
import org.jboss.netty.util.ThreadNameDeterminer;
|
||||
import org.jboss.netty.util.ThreadRenamingRunnable;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class NettyUtils {
|
||||
|
@ -98,4 +106,36 @@ public class NettyUtils {
|
|||
public static void setup() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Turns the given BytesReference into a ChannelBuffer. Note: the returned ChannelBuffer will reference the internal
|
||||
* pages of the BytesReference. Don't free the bytes of reference before the ChannelBuffer goes out of scope.
|
||||
*/
|
||||
public static ChannelBuffer toChannelBuffer(BytesReference reference) {
|
||||
if (reference.length() == 0) {
|
||||
return ChannelBuffers.EMPTY_BUFFER;
|
||||
}
|
||||
if (reference instanceof ChannelBufferBytesReference) {
|
||||
return ((ChannelBufferBytesReference) reference).toChannelBuffer();
|
||||
} else {
|
||||
final BytesRefIterator iterator = reference.iterator();
|
||||
BytesRef slice;
|
||||
final ArrayList<ChannelBuffer> buffers = new ArrayList<>();
|
||||
try {
|
||||
while ((slice = iterator.next()) != null) {
|
||||
buffers.add(ChannelBuffers.wrappedBuffer(slice.bytes, slice.offset, slice.length));
|
||||
}
|
||||
return ChannelBuffers.wrappedBuffer(DEFAULT_GATHERING, buffers.toArray(new ChannelBuffer[buffers.size()]));
|
||||
} catch (IOException ex) {
|
||||
throw new AssertionError("no IO happens here", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps the given ChannelBuffer with a BytesReference
|
||||
*/
|
||||
public static BytesReference toBytesReference(ChannelBuffer channelBuffer) {
|
||||
return new ChannelBufferBytesReference(channelBuffer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,28 +29,28 @@ public interface ByteArray extends BigArray {
|
|||
/**
|
||||
* Get an element given its index.
|
||||
*/
|
||||
public abstract byte get(long index);
|
||||
byte get(long index);
|
||||
|
||||
/**
|
||||
* Set a value at the given index and return the previous value.
|
||||
*/
|
||||
public abstract byte set(long index, byte value);
|
||||
byte set(long index, byte value);
|
||||
|
||||
/**
|
||||
* Get a reference to a slice.
|
||||
*
|
||||
* @return <code>true</code> when a byte[] was materialized, <code>false</code> otherwise.
|
||||
*/
|
||||
public abstract boolean get(long index, int len, BytesRef ref);
|
||||
boolean get(long index, int len, BytesRef ref);
|
||||
|
||||
/**
|
||||
* Bulk set.
|
||||
*/
|
||||
public abstract void set(long index, byte[] buf, int offset, int len);
|
||||
void set(long index, byte[] buf, int offset, int len);
|
||||
|
||||
/**
|
||||
* Fill slots between <code>fromIndex</code> inclusive to <code>toIndex</code> exclusive with <code>value</code>.
|
||||
*/
|
||||
public abstract void fill(long fromIndex, long toIndex, byte value);
|
||||
void fill(long fromIndex, long toIndex, byte value);
|
||||
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.http.netty.cors.CorsHandler;
|
||||
import org.elasticsearch.http.netty.pipelining.OrderedDownstreamChannelEvent;
|
||||
|
@ -105,7 +106,7 @@ public final class NettyHttpChannel extends AbstractRestChannel {
|
|||
ChannelBuffer buffer;
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
buffer = content.toChannelBuffer();
|
||||
buffer = NettyUtils.toChannelBuffer(content);
|
||||
resp.setContent(buffer);
|
||||
|
||||
// If our response doesn't specify a content-type header, set one
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.http.netty;
|
|||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ChannelBufferBytesReference;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.support.RestUtils;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
|
@ -47,7 +47,7 @@ public class NettyHttpRequest extends RestRequest {
|
|||
this.channel = channel;
|
||||
this.params = new HashMap<>();
|
||||
if (request.getContent().readable()) {
|
||||
this.content = new ChannelBufferBytesReference(request.getContent());
|
||||
this.content = NettyUtils.toBytesReference(request.getContent());
|
||||
} else {
|
||||
this.content = BytesArray.EMPTY;
|
||||
}
|
||||
|
|
|
@ -21,8 +21,8 @@ package org.elasticsearch.transport.netty;
|
|||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.ChannelBufferBytesReference;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
|
||||
import java.io.EOFException;
|
||||
|
@ -37,10 +37,6 @@ public class ChannelBufferStreamInput extends StreamInput {
|
|||
private final int startIndex;
|
||||
private final int endIndex;
|
||||
|
||||
public ChannelBufferStreamInput(ChannelBuffer buffer) {
|
||||
this(buffer, buffer.readableBytes());
|
||||
}
|
||||
|
||||
public ChannelBufferStreamInput(ChannelBuffer buffer, int length) {
|
||||
if (length > buffer.readableBytes()) {
|
||||
throw new IndexOutOfBoundsException();
|
||||
|
@ -53,7 +49,7 @@ public class ChannelBufferStreamInput extends StreamInput {
|
|||
|
||||
@Override
|
||||
public BytesReference readBytesReference(int length) throws IOException {
|
||||
ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer.slice(buffer.readerIndex(), length));
|
||||
BytesReference ref = NettyUtils.toBytesReference(buffer.slice(buffer.readerIndex(), length));
|
||||
buffer.skipBytes(length);
|
||||
return ref;
|
||||
}
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.transport.netty;
|
|||
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.bytes.ChannelBufferBytesReference;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.compress.Compressor;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
|
@ -29,6 +28,7 @@ import org.elasticsearch.common.compress.NotCompressedException;
|
|||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
|
@ -111,7 +111,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
|||
if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
|
||||
Compressor compressor;
|
||||
try {
|
||||
compressor = CompressorFactory.compressor(new ChannelBufferBytesReference(buffer));
|
||||
compressor = CompressorFactory.compressor(NettyUtils.toBytesReference(buffer));
|
||||
} catch (NotCompressedException ex) {
|
||||
int maxToRead = Math.min(buffer.readableBytes(), 10);
|
||||
int offset = buffer.readerIndex();
|
||||
|
|
|
@ -909,14 +909,14 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
bRequest.writeThin(stream);
|
||||
stream.close();
|
||||
bytes = bStream.bytes();
|
||||
ChannelBuffer headerBuffer = bytes.toChannelBuffer();
|
||||
ChannelBuffer contentBuffer = bRequest.bytes().toChannelBuffer();
|
||||
ChannelBuffer headerBuffer = NettyUtils.toChannelBuffer(bytes);
|
||||
ChannelBuffer contentBuffer = NettyUtils.toChannelBuffer(bRequest.bytes());
|
||||
buffer = ChannelBuffers.wrappedBuffer(NettyUtils.DEFAULT_GATHERING, headerBuffer, contentBuffer);
|
||||
} else {
|
||||
request.writeTo(stream);
|
||||
stream.close();
|
||||
bytes = bStream.bytes();
|
||||
buffer = bytes.toChannelBuffer();
|
||||
buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
}
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = targetChannel.write(buffer);
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.netty.NettyUtils;
|
||||
import org.elasticsearch.common.netty.ReleaseChannelFutureListener;
|
||||
import org.elasticsearch.transport.RemoteTransportException;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -106,7 +107,7 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
stream.close();
|
||||
|
||||
ReleasablePagedBytesReference bytes = bStream.bytes();
|
||||
ChannelBuffer buffer = bytes.toChannelBuffer();
|
||||
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ReleaseChannelFutureListener listener = new ReleaseChannelFutureListener(bytes);
|
||||
|
@ -136,7 +137,7 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
status = TransportStatus.setError(status);
|
||||
|
||||
BytesReference bytes = stream.bytes();
|
||||
ChannelBuffer buffer = bytes.toChannelBuffer();
|
||||
ChannelBuffer buffer = NettyUtils.toChannelBuffer(bytes);
|
||||
NettyHeader.writeHeader(buffer, requestId, status, version);
|
||||
ChannelFuture future = channel.write(buffer);
|
||||
ChannelFutureListener onResponseSentListener =
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.common.bytes;
|
|||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.apache.lucene.util.BytesRefIterator;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -29,38 +30,19 @@ import org.elasticsearch.common.util.ByteArray;
|
|||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Arrays;
|
||||
|
||||
public class PagedBytesReferenceTests extends ESTestCase {
|
||||
|
||||
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
|
||||
private BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
|
||||
|
||||
private BigArrays bigarrays;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
public void testGet() {
|
||||
public void testGet() throws IOException {
|
||||
int length = randomIntBetween(1, PAGE_SIZE * 3);
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
int sliceOffset = randomIntBetween(0, length / 2);
|
||||
|
@ -70,7 +52,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertEquals(pbr.get(sliceOffset + sliceLength - 1), slice.get(sliceLength - 1));
|
||||
}
|
||||
|
||||
public void testLength() {
|
||||
public void testLength() throws IOException {
|
||||
int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomInt(PAGE_SIZE * 3)};
|
||||
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
|
@ -79,7 +61,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testSlice() {
|
||||
public void testSlice() throws IOException {
|
||||
int length = randomInt(PAGE_SIZE * 3);
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
int sliceOffset = randomIntBetween(0, length / 2);
|
||||
|
@ -265,17 +247,6 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
out.close();
|
||||
}
|
||||
|
||||
public void testWriteToChannel() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE * 4);
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
Path tFile = createTempFile();
|
||||
try (FileChannel channel = FileChannel.open(tFile, StandardOpenOption.WRITE)) {
|
||||
pbr.writeTo(channel);
|
||||
assertEquals(pbr.length(), channel.position());
|
||||
}
|
||||
assertArrayEquals(pbr.toBytes(), Files.readAllBytes(tFile));
|
||||
}
|
||||
|
||||
public void testSliceWriteToOutputStream() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2, 5));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
|
@ -289,21 +260,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
sliceOut.close();
|
||||
}
|
||||
|
||||
public void testSliceWriteToChannel() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2, 5));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
int sliceOffset = randomIntBetween(1, length / 2);
|
||||
int sliceLength = length - sliceOffset;
|
||||
BytesReference slice = pbr.slice(sliceOffset, sliceLength);
|
||||
Path tFile = createTempFile();
|
||||
try (FileChannel channel = FileChannel.open(tFile, StandardOpenOption.WRITE)) {
|
||||
slice.writeTo(channel);
|
||||
assertEquals(slice.length(), channel.position());
|
||||
}
|
||||
assertArrayEquals(slice.toBytes(), Files.readAllBytes(tFile));
|
||||
}
|
||||
|
||||
public void testToBytes() {
|
||||
public void testToBytes() throws IOException {
|
||||
int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomIntBetween(2, PAGE_SIZE * randomIntBetween(2, 5))};
|
||||
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
|
@ -319,7 +276,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testToBytesArraySharedPage() {
|
||||
public void testToBytesArraySharedPage() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE);
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
BytesArray ba = pbr.toBytesArray();
|
||||
|
@ -332,7 +289,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertSame(ba.array(), ba2.array());
|
||||
}
|
||||
|
||||
public void testToBytesArrayMaterializedPages() {
|
||||
public void testToBytesArrayMaterializedPages() throws IOException {
|
||||
// we need a length != (n * pagesize) to avoid page sharing at boundaries
|
||||
int length = 0;
|
||||
while ((length % PAGE_SIZE) == 0) {
|
||||
|
@ -349,7 +306,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertNotSame(ba.array(), ba2.array());
|
||||
}
|
||||
|
||||
public void testCopyBytesArray() {
|
||||
public void testCopyBytesArray() throws IOException {
|
||||
// small PBR which would normally share the first page
|
||||
int length = randomIntBetween(10, PAGE_SIZE);
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
|
@ -360,7 +317,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertNotSame(ba.array(), ba2.array());
|
||||
}
|
||||
|
||||
public void testSliceCopyBytesArray() {
|
||||
public void testSliceCopyBytesArray() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2, 8));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
int sliceOffset = randomIntBetween(0, pbr.length());
|
||||
|
@ -377,45 +334,67 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertArrayEquals(ba1.array(), ba2.array());
|
||||
}
|
||||
|
||||
public void testToChannelBuffer() {
|
||||
public void testEmptyToBytesRefIterator() throws IOException {
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(0);
|
||||
assertNull(pbr.iterator().next());
|
||||
}
|
||||
|
||||
public void testIterator() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2, 8));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
ChannelBuffer cb = pbr.toChannelBuffer();
|
||||
assertNotNull(cb);
|
||||
byte[] bufferBytes = new byte[length];
|
||||
cb.getBytes(0, bufferBytes);
|
||||
assertArrayEquals(pbr.toBytes(), bufferBytes);
|
||||
BytesRefIterator iterator = pbr.iterator();
|
||||
BytesRef ref;
|
||||
BytesRefBuilder builder = new BytesRefBuilder();
|
||||
while((ref = iterator.next()) != null) {
|
||||
builder.append(ref);
|
||||
}
|
||||
assertArrayEquals(pbr.toBytes(), BytesRef.deepCopyOf(builder.toBytesRef()).bytes);
|
||||
}
|
||||
|
||||
public void testEmptyToChannelBuffer() {
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(0);
|
||||
ChannelBuffer cb = pbr.toChannelBuffer();
|
||||
assertNotNull(cb);
|
||||
assertEquals(0, pbr.length());
|
||||
assertEquals(0, cb.capacity());
|
||||
}
|
||||
|
||||
public void testSliceToChannelBuffer() {
|
||||
public void testSliceIterator() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2, 8));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
int sliceOffset = randomIntBetween(0, pbr.length());
|
||||
int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset);
|
||||
BytesReference slice = pbr.slice(sliceOffset, sliceLength);
|
||||
ChannelBuffer cbSlice = slice.toChannelBuffer();
|
||||
assertNotNull(cbSlice);
|
||||
byte[] sliceBufferBytes = new byte[sliceLength];
|
||||
cbSlice.getBytes(0, sliceBufferBytes);
|
||||
assertArrayEquals(slice.toBytes(), sliceBufferBytes);
|
||||
BytesRefIterator iterator = slice.iterator();
|
||||
BytesRef ref = null;
|
||||
BytesRefBuilder builder = new BytesRefBuilder();
|
||||
while((ref = iterator.next()) != null) {
|
||||
builder.append(ref);
|
||||
}
|
||||
assertArrayEquals(slice.toBytes(), BytesRef.deepCopyOf(builder.toBytesRef()).bytes);
|
||||
}
|
||||
|
||||
public void testHasArray() {
|
||||
public void testIteratorRandom() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2, 8));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
if (randomBoolean()) {
|
||||
int sliceOffset = randomIntBetween(0, pbr.length());
|
||||
int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset);
|
||||
pbr = pbr.slice(sliceOffset, sliceLength);
|
||||
}
|
||||
|
||||
if (randomBoolean()) {
|
||||
pbr = pbr.toBytesArray();
|
||||
}
|
||||
BytesRefIterator iterator = pbr.iterator();
|
||||
BytesRef ref = null;
|
||||
BytesRefBuilder builder = new BytesRefBuilder();
|
||||
while((ref = iterator.next()) != null) {
|
||||
builder.append(ref);
|
||||
}
|
||||
assertArrayEquals(pbr.toBytes(), BytesRef.deepCopyOf(builder.toBytesRef()).bytes);
|
||||
}
|
||||
|
||||
public void testHasArray() throws IOException {
|
||||
int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(1, 3));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
// must return true for <= pagesize
|
||||
assertEquals(length <= PAGE_SIZE, pbr.hasArray());
|
||||
}
|
||||
|
||||
public void testArray() {
|
||||
public void testArray() throws IOException {
|
||||
int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomIntBetween(2, PAGE_SIZE * randomIntBetween(2, 5))};
|
||||
|
||||
for (int i = 0; i < sizes.length; i++) {
|
||||
|
@ -437,7 +416,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testArrayOffset() {
|
||||
public void testArrayOffset() throws IOException {
|
||||
int length = randomInt(PAGE_SIZE * randomIntBetween(2, 5));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
if (pbr.hasArray()) {
|
||||
|
@ -452,7 +431,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testSliceArrayOffset() {
|
||||
public void testSliceArrayOffset() throws IOException {
|
||||
int length = randomInt(PAGE_SIZE * randomIntBetween(2, 5));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
int sliceOffset = randomIntBetween(0, pbr.length());
|
||||
|
@ -477,7 +456,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
// TODO: good way to test?
|
||||
}
|
||||
|
||||
public void testToBytesRef() {
|
||||
public void testToBytesRef() throws IOException {
|
||||
int length = randomIntBetween(0, PAGE_SIZE);
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
BytesRef ref = pbr.toBytesRef();
|
||||
|
@ -486,7 +465,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertEquals(pbr.length(), ref.length);
|
||||
}
|
||||
|
||||
public void testSliceToBytesRef() {
|
||||
public void testSliceToBytesRef() throws IOException {
|
||||
int length = randomIntBetween(0, PAGE_SIZE);
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
// get a BytesRef from a slice
|
||||
|
@ -498,7 +477,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertEquals(sliceLength, sliceRef.length);
|
||||
}
|
||||
|
||||
public void testCopyBytesRef() {
|
||||
public void testCopyBytesRef() throws IOException {
|
||||
int length = randomIntBetween(0, PAGE_SIZE * randomIntBetween(2, 5));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
BytesRef ref = pbr.copyBytesRef();
|
||||
|
@ -506,7 +485,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertEquals(pbr.length(), ref.length);
|
||||
}
|
||||
|
||||
public void testHashCode() {
|
||||
public void testHashCode() throws IOException {
|
||||
// empty content must have hash 1 (JDK compat)
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(0);
|
||||
assertEquals(Arrays.hashCode(BytesRef.EMPTY_BYTES), pbr.hashCode());
|
||||
|
@ -542,7 +521,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
assertEquals(pbr, pbr2);
|
||||
}
|
||||
|
||||
public void testEqualsPeerClass() {
|
||||
public void testEqualsPeerClass() throws IOException {
|
||||
int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2, 5));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
BytesReference ba = new BytesArray(pbr.toBytes());
|
||||
|
@ -569,16 +548,12 @@ public class PagedBytesReferenceTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private BytesReference getRandomizedPagedBytesReference(int length) {
|
||||
private BytesReference getRandomizedPagedBytesReference(int length) throws IOException {
|
||||
// we know bytes stream output always creates a paged bytes reference, we use it to create randomized content
|
||||
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(length, bigarrays);
|
||||
try {
|
||||
for (int i = 0; i < length; i++) {
|
||||
out.writeByte((byte) random().nextInt(1 << 8));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
fail("should not happen " + e.getMessage());
|
||||
}
|
||||
assertThat(out.size(), Matchers.equalTo(length));
|
||||
BytesReference ref = out.bytes();
|
||||
assertThat(ref.length(), Matchers.equalTo(length));
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.common.netty;
|
||||
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.buffer.CompositeChannelBuffer;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class NettyUtilsTests extends ESTestCase {
|
||||
|
||||
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
|
||||
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
|
||||
|
||||
public void testToChannelBufferWithEmptyRef() throws IOException {
|
||||
ChannelBuffer channelBuffer = NettyUtils.toChannelBuffer(getRandomizedBytesReference(0));
|
||||
assertSame(ChannelBuffers.EMPTY_BUFFER, channelBuffer);
|
||||
}
|
||||
|
||||
public void testToChannelBufferWithSlice() throws IOException {
|
||||
BytesReference ref = getRandomizedBytesReference(randomIntBetween(1, 3 * PAGE_SIZE));
|
||||
int sliceOffset = randomIntBetween(0, ref.length());
|
||||
int sliceLength = randomIntBetween(ref.length() - sliceOffset, ref.length() - sliceOffset);
|
||||
BytesReference slice = ref.slice(sliceOffset, sliceLength);
|
||||
ChannelBuffer channelBuffer = NettyUtils.toChannelBuffer(slice);
|
||||
BytesReference bytesReference = NettyUtils.toBytesReference(channelBuffer);
|
||||
assertArrayEquals(slice.toBytes(), bytesReference.toBytes());
|
||||
}
|
||||
|
||||
public void testToChannelBufferWithSliceAfter() throws IOException {
|
||||
BytesReference ref = getRandomizedBytesReference(randomIntBetween(1, 3 * PAGE_SIZE));
|
||||
int sliceOffset = randomIntBetween(0, ref.length());
|
||||
int sliceLength = randomIntBetween(ref.length() - sliceOffset, ref.length() - sliceOffset);
|
||||
ChannelBuffer channelBuffer = NettyUtils.toChannelBuffer(ref);
|
||||
BytesReference bytesReference = NettyUtils.toBytesReference(channelBuffer);
|
||||
assertArrayEquals(ref.slice(sliceOffset, sliceLength).toBytes(), bytesReference.slice(sliceOffset, sliceLength).toBytes());
|
||||
}
|
||||
|
||||
public void testToChannelBuffer() throws IOException {
|
||||
BytesReference ref = getRandomizedBytesReference(randomIntBetween(1, 3 * PAGE_SIZE));
|
||||
ChannelBuffer channelBuffer = NettyUtils.toChannelBuffer(ref);
|
||||
BytesReference bytesReference = NettyUtils.toBytesReference(channelBuffer);
|
||||
if (ref instanceof ChannelBufferBytesReference) {
|
||||
assertEquals(channelBuffer, ((ChannelBufferBytesReference) ref).toChannelBuffer());
|
||||
} else if (ref.hasArray() == false) { // we gather the buffers into a channel buffer
|
||||
assertTrue(channelBuffer instanceof CompositeChannelBuffer);
|
||||
}
|
||||
assertArrayEquals(ref.toBytes(), bytesReference.toBytes());
|
||||
}
|
||||
|
||||
private BytesReference getRandomizedBytesReference(int length) throws IOException {
|
||||
// TODO we should factor out a BaseBytesReferenceTestCase
|
||||
// we know bytes stream output always creates a paged bytes reference, we use it to create randomized content
|
||||
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(length, bigarrays);
|
||||
for (int i = 0; i < length; i++) {
|
||||
out.writeByte((byte) random().nextInt(1 << 8));
|
||||
}
|
||||
assertEquals(out.size(), length);
|
||||
BytesReference ref = out.bytes();
|
||||
assertEquals(ref.length(), length);
|
||||
if (randomBoolean()) {
|
||||
return ref.toBytesArray();
|
||||
} else if (randomBoolean()) {
|
||||
BytesArray bytesArray = ref.toBytesArray();
|
||||
return NettyUtils.toBytesReference(ChannelBuffers.wrappedBuffer(bytesArray.array(), bytesArray.arrayOffset(),
|
||||
bytesArray.length()));
|
||||
} else {
|
||||
return ref;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -20,18 +20,14 @@
|
|||
package org.elasticsearch.common.bytes;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.io.Channels;
|
||||
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.util.CharsetUtil;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.CharBuffer;
|
||||
import java.nio.channels.GatheringByteChannel;
|
||||
import java.nio.charset.CharacterCodingException;
|
||||
import java.nio.charset.CharsetDecoder;
|
||||
import java.nio.charset.CoderResult;
|
||||
|
@ -85,11 +81,6 @@ public class ByteBufferBytesReference implements BytesReference {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(GatheringByteChannel channel) throws IOException {
|
||||
Channels.writeToChannel(buffer, channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] toBytes() {
|
||||
if (!buffer.hasRemaining()) {
|
||||
|
@ -113,11 +104,6 @@ public class ByteBufferBytesReference implements BytesReference {
|
|||
return new BytesArray(toBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelBuffer toChannelBuffer() {
|
||||
return ChannelBuffers.wrappedBuffer(buffer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasArray() {
|
||||
return buffer.hasArray();
|
Loading…
Reference in New Issue