diff --git a/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java b/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java new file mode 100644 index 00000000000..d9dd17db370 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java @@ -0,0 +1,443 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bytes; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.CharsRef; +import org.apache.lucene.util.UnicodeUtil; +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ByteArray; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.GatheringByteChannel; +import java.util.Arrays; + +public final class PagedBytesReference implements BytesReference { + + private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; + private static final int NIO_GATHERING_LIMIT = 524288; + + private final BigArrays bigarrays; + private final ByteArray bytearray; + private final int offset; + private final int length; + private int hash = 0; + + public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) { + this(bigarrays, bytearray, 0, length); + } + + public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int from, int length) { + this.bigarrays = bigarrays; + this.bytearray = bytearray; + this.offset = from; + this.length = length; + } + + @Override + public byte get(int index) { + return bytearray.get(offset + index); + } + + @Override + public int length() { + return length; + } + + @Override + public BytesReference slice(int from, int length) { + if (from < 0 || (from + length) > length()) { + throw new ElasticsearchIllegalArgumentException("can't slice a buffer with length [" + length() + "], with slice parameters from [" + from + "], length [" + length + "]"); + } + + return new PagedBytesReference(bigarrays, bytearray, offset + from, length); + } + + @Override + public StreamInput streamInput() { + return new PagedBytesReferenceStreamInput(bytearray, offset, length); + } + + @Override + public void writeTo(OutputStream os) throws IOException { + BytesRef ref = new BytesRef(); + int written = 0; + + // are we a slice? + if (offset != 0) { + // remaining size of page fragment at offset + int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE)); + bytearray.get(offset, fragmentSize, ref); + os.write(ref.bytes, ref.offset, fragmentSize); + written += fragmentSize; + } + + // handle remainder of pages + trailing fragment + while (written < length) { + int remaining = length - written; + int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining; + bytearray.get(offset + written, bulkSize, ref); + os.write(ref.bytes, ref.offset, bulkSize); + written += bulkSize; + } + } + + @Override + public void writeTo(GatheringByteChannel channel) throws IOException { + ByteBuffer[] buffers; + ByteBuffer currentBuffer = null; + BytesRef ref = new BytesRef(); + int pos = 0; + + // are we a slice? + if (offset != 0) { + // remaining size of page fragment at offset + int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE)); + bytearray.get(offset, fragmentSize, ref); + currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, fragmentSize); + pos += fragmentSize; + } + + // we only have a single page + if (pos == length && currentBuffer != null) { + channel.write(currentBuffer); + return; + } + + // a slice > pagesize will likely require extra buffers for initial/trailing fragments + int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos); + + buffers = new ByteBuffer[numBuffers]; + int bufferSlot = 0; + + if (currentBuffer != null) { + buffers[bufferSlot] = currentBuffer; + bufferSlot++; + } + + // handle remainder of pages + trailing fragment + while (pos < length) { + int remaining = length - pos; + int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining; + bytearray.get(offset + pos, bulkSize, ref); + currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, bulkSize); + buffers[bufferSlot] = currentBuffer; + bufferSlot++; + pos += bulkSize; + } + + // this would indicate that our numBuffer calculation is off by one. + assert (numBuffers == bufferSlot); + + // finally write all buffers + channel.write(buffers); + } + + @Override + public byte[] toBytes() { + if (length == 0) { + return BytesRef.EMPTY_BYTES; + } + + BytesRef ref = new BytesRef(); + bytearray.get(offset, length, ref); + + // undo the single-page optimization by ByteArray.get(), otherwise + // a materialized stream will contain traling garbage/zeros + byte[] result = ref.bytes; + if (result.length != length || ref.offset != 0) { + result = Arrays.copyOfRange(result, ref.offset, ref.offset + length); + } + + return result; + } + + @Override + public BytesArray toBytesArray() { + BytesRef ref = new BytesRef(); + bytearray.get(offset, length, ref); + return new BytesArray(ref); + } + + @Override + public BytesArray copyBytesArray() { + BytesRef ref = new BytesRef(); + boolean copied = bytearray.get(offset, length, ref); + + if (copied) { + // BigArray has materialized for us, no need to do it again + return new BytesArray(ref.bytes, ref.offset, ref.length); + } + else { + // here we need to copy the bytes even when shared + byte[] copy = Arrays.copyOfRange(ref.bytes, ref.offset, ref.offset + ref.length); + return new BytesArray(copy); + } + } + + @Override + public ChannelBuffer toChannelBuffer() { + ChannelBuffer[] buffers; + ChannelBuffer currentBuffer = null; + BytesRef ref = new BytesRef(); + int pos = 0; + + // are we a slice? + if (offset != 0) { + // remaining size of page fragment at offset + int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE)); + bytearray.get(offset, fragmentSize, ref); + currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, fragmentSize); + pos += fragmentSize; + } + + // no need to create a composite buffer for a single page + if (pos == length && currentBuffer != null) { + return currentBuffer; + } + + // a slice > pagesize will likely require extra buffers for initial/trailing fragments + int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos); + + buffers = new ChannelBuffer[numBuffers]; + int bufferSlot = 0; + + if (currentBuffer != null) { + buffers[bufferSlot] = currentBuffer; + bufferSlot++; + } + + // handle remainder of pages + trailing fragment + while (pos < length) { + int remaining = length - pos; + int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining; + bytearray.get(offset + pos, bulkSize, ref); + currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, bulkSize); + buffers[bufferSlot] = currentBuffer; + bufferSlot++; + pos += bulkSize; + } + + // this would indicate that our numBuffer calculation is off by one. + assert (numBuffers == bufferSlot); + + // we can use gathering writes from the ChannelBuffers, but only if they are + // moderately small to prevent OOMs due to DirectBuffer allocations. + return ChannelBuffers.wrappedBuffer(length <= NIO_GATHERING_LIMIT, buffers); + } + + @Override + public boolean hasArray() { + return (offset + length <= PAGE_SIZE); + } + + @Override + public byte[] array() { + if (hasArray()) { + if (length == 0) { + return BytesRef.EMPTY_BYTES; + } + + BytesRef ref = new BytesRef(); + bytearray.get(offset, length, ref); + return ref.bytes; + } + + throw new IllegalStateException("array not available"); + } + + @Override + public int arrayOffset() { + if (hasArray()) { + BytesRef ref = new BytesRef(); + bytearray.get(offset, length, ref); + return ref.offset; + } + + throw new IllegalStateException("array not available"); + } + + @Override + public String toUtf8() { + if (length() == 0) { + return ""; + } + + byte[] bytes = toBytes(); + final CharsRef ref = new CharsRef(length); + UnicodeUtil.UTF8toUTF16(bytes, offset, length, ref); + return ref.toString(); + } + + @Override + public BytesRef toBytesRef() { + BytesRef bref = new BytesRef(); + // if length <= pagesize this will dereference the page, or materialize the byte[] + bytearray.get(offset, length, bref); + return bref; + } + + @Override + public BytesRef copyBytesRef() { + byte[] bytes = toBytes(); + return new BytesRef(bytes, offset, length); + } + + @Override + public int hashCode() { + if (hash == 0) { + // TODO: delegate to BigArrays via: + // hash = bigarrays.hashCode(bytearray); + // and for slices: + // hash = bigarrays.hashCode(bytearray, offset, length); + int tmphash = 1; + for (int i = 0; i < length; i++) { + tmphash = 31 * tmphash + bytearray.get(offset + i); + } + hash = tmphash; + } + return hash; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof PagedBytesReference)) { + return BytesReference.Helper.bytesEqual(this, (BytesReference)obj); + } + + PagedBytesReference other = (PagedBytesReference)obj; + if (length != other.length) { + return false; + } + + // TODO: delegate to BigArrays via: + // return bigarrays.equals(bytearray, other.bytearray); + // and for slices: + // return bigarrays.equals(bytearray, start, other.bytearray, otherstart, len); + ByteArray otherArray = other.bytearray; + int otherOffset = other.offset; + for (int i = 0; i < length; i++) { + if (bytearray.get(offset + i) != otherArray.get(otherOffset + i)) { + return false; + } + } + return true; + } + + private int countRequiredBuffers(int initialCount, int numBytes) { + int numBuffers = initialCount; + // an "estimate" of how many pages remain - rounded down + int pages = numBytes / PAGE_SIZE; + // a remaining fragment < pagesize needs at least one buffer + numBuffers += (pages == 0) ? 1 : pages; + // a remainder that is not a multiple of pagesize also needs an extra buffer + numBuffers += (pages > 0 && numBuffers % PAGE_SIZE > 0) ? 1 : 0; + return numBuffers; + } + + private static class PagedBytesReferenceStreamInput extends StreamInput { + + private final ByteArray bytearray; + private final int offset; + private final int length; + private int pos; + + public PagedBytesReferenceStreamInput(ByteArray bytearray, int offset, int length) { + this.bytearray = bytearray; + this.offset = offset; + this.length = length; + this.pos = 0; + + if (offset + length > bytearray.size()) { + throw new IndexOutOfBoundsException("offset+length >= bytearray.size()"); + } + } + + @Override + public byte readByte() throws IOException { + if (pos >= length) { + throw new EOFException(); + } + + return bytearray.get(offset + pos++); + } + + @Override + public void readBytes(byte[] b, int bOffset, int len) throws IOException { + if (len > offset + length) { + throw new IndexOutOfBoundsException("Cannot read " + len + " bytes from stream with length " + length + " at pos " + pos); + } + + read(b, bOffset, len); + } + + @Override + public int read() throws IOException { + return (pos < length) ? bytearray.get(offset + pos++) : -1; + } + + @Override + public int read(byte[] b, int bOffset, int len) throws IOException { + if (len == 0) { + return 0; + } + + if (pos >= offset + length) { + return -1; + } + + // we need to stop at the end + len = Math.min(length, len); + + // ByteArray.get(BytesRef) does not work since it flips the + // ref's byte[] pointer, so for now we copy byte-by-byte + int written = 0; + while (written < len) { + b[bOffset + written] = bytearray.get(offset + written); + written++; + } + + pos += written; + return written; + } + + @Override + public void reset() throws IOException { + pos = 0; + } + + @Override + public void close() throws IOException { + // do nothing + } + + } +} diff --git a/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java b/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java index f323b71dba0..899ecf2d873 100644 --- a/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java +++ b/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java @@ -19,9 +19,8 @@ package org.elasticsearch.common.io.stream; -import org.apache.lucene.util.BytesRef; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.bytes.PagedBytesReference; import org.elasticsearch.common.io.BytesStream; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ByteArray; @@ -154,9 +153,7 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream { @Override public BytesReference bytes() { - BytesRef bref = new BytesRef(); - bytes.get(0, count, bref); - return new BytesArray(bref, false); + return new PagedBytesReference(bigarrays, bytes, count); } private void ensureCapacity(int offset) { diff --git a/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java b/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java new file mode 100644 index 00000000000..741e0e06a31 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java @@ -0,0 +1,467 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bytes; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.cache.recycler.MockBigArrays; +import org.jboss.netty.buffer.ChannelBuffer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; + +public class PagedBytesReferenceTest extends ElasticsearchTestCase { + + private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; + + private MockBigArrays bigarrays; + + @Before + public void setUp() throws Exception { + super.setUp(); + bigarrays = new MockBigArrays(ImmutableSettings.EMPTY, null); + } + + @After + public void tearDown() throws Exception { + // necessary since we currently never release BigArrays + MockBigArrays.reset(); + super.tearDown(); + } + + @Test + public void testGet() { + int length = randomInt(PAGE_SIZE * 3); + BytesReference pbr = getRandomizedPagedBytesReference(length); + int sliceOffset = randomIntBetween(0, length / 2); + int sliceLength = Math.max(1, length - sliceOffset - 1); + BytesReference slice = pbr.slice(sliceOffset, sliceLength); + assertEquals(pbr.get(sliceOffset), slice.get(0)); + assertEquals(pbr.get(sliceOffset + sliceLength), slice.get(sliceLength)); + } + + public void testLength() { + int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomInt(PAGE_SIZE * 3)}; + + for (int i = 0; i < sizes.length; i++) { + BytesReference pbr = getRandomizedPagedBytesReference(sizes[i]); + assertEquals(sizes[i], pbr.length()); + } + } + + public void testSlice() { + int length = randomInt(PAGE_SIZE * 3); + BytesReference pbr = getRandomizedPagedBytesReference(length); + int sliceOffset = randomIntBetween(0, length / 2); + int sliceLength = Math.max(1, length - sliceOffset - 1); + BytesReference slice = pbr.slice(sliceOffset, sliceLength); + assertEquals(sliceLength, slice.length()); + + if (slice.hasArray()) { + assertEquals(sliceOffset, slice.arrayOffset()); + } + else { + try { + slice.arrayOffset(); + fail("expected IllegalStateException"); + } + catch (IllegalStateException ise) { + // expected + } + } + } + + public void testStreamInput() throws IOException { + int length = randomIntBetween(10, PAGE_SIZE * 3); + BytesReference pbr = getRandomizedPagedBytesReference(length); + StreamInput si = pbr.streamInput(); + assertNotNull(si); + + // read single bytes one by one + assertEquals(pbr.get(0), si.readByte()); + assertEquals(pbr.get(1), si.readByte()); + assertEquals(pbr.get(2), si.readByte()); + si.reset(); + + // buffer for bulk reads + byte[] origBuf = new byte[length]; + getRandom().nextBytes(origBuf); + byte[] targetBuf = Arrays.copyOf(origBuf, origBuf.length); + + // bulk-read 0 bytes: must not modify buffer + si.readBytes(targetBuf, 0, 0); + assertEquals(origBuf[0], targetBuf[0]); + si.reset(); + + // read an int + int i = si.read(); + assertFalse(i == 0); + si.reset(); + + // bulk-read all + si.readFully(targetBuf); + assertArrayEquals(pbr.toBytes(), targetBuf); + + // continuing to read should now fail with EOFException + try { + si.readByte(); + fail("expected EOF"); + } + catch (EOFException eof) { + // yay + } + + // try to read more than the stream contains + si.reset(); + try { + si.readBytes(targetBuf, 0, length * 2); + fail("expected IndexOutOfBoundsException: le > stream.length"); + } + catch (IndexOutOfBoundsException ioob) { + // expected + } + } + + public void testSliceStreamInput() throws IOException { + int length = randomIntBetween(10, PAGE_SIZE * 3); + BytesReference pbr = getRandomizedPagedBytesReference(length); + StreamInput si = pbr.streamInput(); + + // test stream input over slice (upper half of original) + int sliceOffset = randomIntBetween(1, length / 2); + int sliceLength = length - sliceOffset; + BytesReference slice = pbr.slice(sliceOffset, sliceLength); + StreamInput sliceInput = slice.streamInput(); + + // single reads + assertEquals(slice.get(0), sliceInput.readByte()); + assertEquals(slice.get(1), sliceInput.readByte()); + assertEquals(slice.get(2), sliceInput.readByte()); + si.reset(); + + // bulk read + byte[] sliceBytes = new byte[sliceLength]; + sliceInput.readFully(sliceBytes); + + // compare slice content with upper half of original + byte[] pbrSliceBytes = Arrays.copyOfRange(pbr.toBytes(), sliceOffset, length); + assertArrayEquals(pbrSliceBytes, sliceBytes); + + // compare slice bytes with bytes read from slice via streamInput :D + byte[] sliceToBytes = slice.toBytes(); + assertEquals(sliceBytes.length, sliceToBytes.length); + assertArrayEquals(sliceBytes, sliceToBytes); + } + + public void testWriteTo() throws IOException { + int length = randomIntBetween(10, PAGE_SIZE * 4); + BytesReference pbr = getRandomizedPagedBytesReference(length); + BytesStreamOutput out = new BytesStreamOutput(); + pbr.writeTo(out); + assertEquals(pbr.length(), out.size()); + assertArrayEquals(pbr.toBytes(), out.bytes().toBytes()); + out.close(); + } + + public void testSliceWriteTo() throws IOException { + int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2,5)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + int sliceOffset = randomIntBetween(1, length / 2); + int sliceLength = length - sliceOffset; + BytesReference slice = pbr.slice(sliceOffset, sliceLength); + BytesStreamOutput sliceOut = new BytesStreamOutput(sliceLength); + slice.writeTo(sliceOut); + assertEquals(slice.length(), sliceOut.size()); + assertArrayEquals(slice.toBytes(), sliceOut.bytes().toBytes()); + sliceOut.close(); + } + + public void testToBytes() { + int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomIntBetween(2, PAGE_SIZE * randomIntBetween(2,5))}; + + for (int i = 0; i < sizes.length; i++) { + BytesReference pbr = getRandomizedPagedBytesReference(sizes[i]); + byte[] bytes = pbr.toBytes(); + assertEquals(sizes[i], bytes.length); + // verify that toBytes() is cheap for small payloads + if (sizes[i] <= PAGE_SIZE) { + assertSame(bytes, pbr.toBytes()); + } + else { + assertNotSame(bytes, pbr.toBytes()); + } + } + } + + public void testToBytesArraySharedPage() { + int length = randomIntBetween(10, PAGE_SIZE); + BytesReference pbr = getRandomizedPagedBytesReference(length); + BytesArray ba = pbr.toBytesArray(); + BytesArray ba2 = pbr.toBytesArray(); + assertNotNull(ba); + assertNotNull(ba2); + assertEquals(pbr.length(), ba.length()); + assertEquals(ba.length(), ba2.length()); + // single-page optimization + assertSame(ba.array(), ba2.array()); + } + + public void testToBytesArrayMaterializedPages() { + int length = randomIntBetween(PAGE_SIZE, PAGE_SIZE * randomIntBetween(2,5)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + BytesArray ba = pbr.toBytesArray(); + BytesArray ba2 = pbr.toBytesArray(); + assertNotNull(ba); + assertNotNull(ba2); + assertEquals(pbr.length(), ba.length()); + assertEquals(ba.length(), ba2.length()); + // ensure no single-page optimization + assertNotSame(ba.array(), ba2.array()); + } + + public void testCopyBytesArray() { + // small PBR which would normally share the first page + int length = randomIntBetween(10, PAGE_SIZE); + BytesReference pbr = getRandomizedPagedBytesReference(length); + BytesArray ba = pbr.copyBytesArray(); + BytesArray ba2 = pbr.copyBytesArray(); + assertNotNull(ba); + assertNotSame(ba, ba2); + assertNotSame(ba.array(), ba2.array()); + } + + public void testSliceCopyBytesArray() { + int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2,8)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + int sliceOffset = randomIntBetween(0, pbr.length()); + int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset); + BytesReference slice = pbr.slice(sliceOffset, sliceLength); + + BytesArray ba1 = slice.copyBytesArray(); + BytesArray ba2 = slice.copyBytesArray(); + assertNotNull(ba1); + assertNotNull(ba2); + assertNotSame(ba1.array(), ba2.array()); + assertArrayEquals(slice.toBytes(), ba1.array()); + assertArrayEquals(slice.toBytes(), ba2.array()); + assertArrayEquals(ba1.array(), ba2.array()); + } + + public void testToChannelBuffer() { + int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2,8)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + ChannelBuffer cb = pbr.toChannelBuffer(); + assertNotNull(cb); + byte[] bufferBytes = new byte[length]; + cb.getBytes(0, bufferBytes); + assertArrayEquals(pbr.toBytes(), bufferBytes); + } + + public void testSliceToChannelBuffer() { + int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2,8)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + int sliceOffset = randomIntBetween(0, pbr.length()); + int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset); + BytesReference slice = pbr.slice(sliceOffset, sliceLength); + ChannelBuffer cbSlice = slice.toChannelBuffer(); + assertNotNull(cbSlice); + byte[] sliceBufferBytes = new byte[sliceLength]; + cbSlice.getBytes(0, sliceBufferBytes); + assertArrayEquals(slice.toBytes(), sliceBufferBytes); + } + + public void testHasArray() { + int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(1,3)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + // must return true for <= pagesize + assertEquals(length <= PAGE_SIZE, pbr.hasArray()); + } + + public void testArray() { + int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomIntBetween(2, PAGE_SIZE * randomIntBetween(2,5))}; + + for (int i = 0; i < sizes.length; i++) { + BytesReference pbr = getRandomizedPagedBytesReference(sizes[i]); + // verify that array() is cheap for small payloads + if (sizes[i] <= PAGE_SIZE) { + byte[] array = pbr.array(); + assertNotNull(array); + assertEquals(sizes[i], array.length); + assertSame(array, pbr.array()); + } + else { + try { + pbr.array(); + fail("expected IllegalStateException"); + } + catch (IllegalStateException isx) { + // expected + } + } + } + } + + public void testArrayOffset() { + int length = randomInt(PAGE_SIZE * randomIntBetween(2,5)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + if (pbr.hasArray()) { + assertEquals(0, pbr.arrayOffset()); + } + else { + try { + pbr.arrayOffset(); + fail("expected IllegalStateException"); + } + catch (IllegalStateException ise) { + // expected + } + } + } + + public void testSliceArrayOffset() { + int length = randomInt(PAGE_SIZE * randomIntBetween(2,5)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + int sliceOffset = randomIntBetween(0, pbr.length()); + int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset); + BytesReference slice = pbr.slice(sliceOffset, sliceLength); + if (slice.hasArray()) { + assertEquals(sliceOffset, slice.arrayOffset()); + } + else { + try { + slice.arrayOffset(); + fail("expected IllegalStateException"); + } + catch (IllegalStateException ise) { + // expected + } + } + } + + public void testToUtf8() throws IOException { + // test empty + BytesReference pbr = getRandomizedPagedBytesReference(0); + assertEquals("", pbr.toUtf8()); + // TODO: good way to test? + } + + public void testToBytesRef() { + int length = randomIntBetween(0, PAGE_SIZE); + BytesReference pbr = getRandomizedPagedBytesReference(length); + BytesRef ref = pbr.toBytesRef(); + assertNotNull(ref); + assertEquals(pbr.arrayOffset(), ref.offset); + assertEquals(pbr.length(), ref.length); + } + + public void testSliceToBytesRef() { + int length = randomIntBetween(0, PAGE_SIZE); + BytesReference pbr = getRandomizedPagedBytesReference(length); + // get a BytesRef from a slice + int sliceOffset = randomIntBetween(0, pbr.length()); + int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset); + BytesRef sliceRef = pbr.slice(sliceOffset, sliceLength).toBytesRef(); + // note that these are only true if we have <= than a page, otherwise offset/length are shifted + assertEquals(sliceOffset, sliceRef.offset); + assertEquals(sliceLength, sliceRef.length); + } + + public void testCopyBytesRef() { + int length = randomIntBetween(0, PAGE_SIZE * randomIntBetween(2,5)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + BytesRef ref = pbr.copyBytesRef(); + assertNotNull(ref); + assertEquals(pbr.length(), ref.length); + } + + public void testHashCode() { + // empty content must have hash 1 (JDK compat) + BytesReference pbr = getRandomizedPagedBytesReference(0); + assertEquals(Arrays.hashCode(BytesRef.EMPTY_BYTES), pbr.hashCode()); + + // test with content + pbr = getRandomizedPagedBytesReference(randomIntBetween(0, PAGE_SIZE * randomIntBetween(2,5))); + int jdkHash = Arrays.hashCode(pbr.toBytes()); + int pbrHash = pbr.hashCode(); + assertEquals(jdkHash, pbrHash); + + // test hashes of slices + int sliceFrom = randomIntBetween(0, pbr.length()); + int sliceLength = randomIntBetween(pbr.length() - sliceFrom, pbr.length() - sliceFrom); + BytesReference slice = pbr.slice(sliceFrom, sliceLength); + int sliceJdkHash = Arrays.hashCode(slice.toBytes()); + int sliceHash = slice.hashCode(); + assertEquals(sliceJdkHash, sliceHash); + } + + public void testEquals() { + int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2,5)); + ByteArray ba1 = bigarrays.newByteArray(length, false); + ByteArray ba2 = bigarrays.newByteArray(length, false); + + // copy contents + for (long i = 0; i < length; i++) { + ba2.set(i, ba1.get(i)); + } + + // get refs & compare + BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length); + BytesReference pbr2 = new PagedBytesReference(bigarrays, ba2, length); + assertEquals(pbr, pbr2); + } + + public void testEqualsPeerClass() { + int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2,5)); + BytesReference pbr = getRandomizedPagedBytesReference(length); + BytesReference ba = new BytesArray(pbr.toBytes()); + assertEquals(pbr, ba); + } + + public void testSliceEquals() { + int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2,5)); + ByteArray ba1 = bigarrays.newByteArray(length, false); + BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length); + + // test equality of slices + int sliceFrom = randomIntBetween(0, pbr.length()); + int sliceLength = randomIntBetween(pbr.length() - sliceFrom, pbr.length() - sliceFrom); + BytesReference slice1 = pbr.slice(sliceFrom, sliceLength); + BytesReference slice2 = pbr.slice(sliceFrom, sliceLength); + assertArrayEquals(slice1.toBytes(), slice2.toBytes()); + + // test a slice with same offset but different length + BytesReference slice3 = pbr.slice(sliceFrom, sliceLength / 2); + assertFalse(Arrays.equals(slice1.toBytes(), slice3.toBytes())); + } + + private BytesReference getRandomizedPagedBytesReference(int length) { + return new PagedBytesReference(bigarrays, bigarrays.newByteArray(length, false), length); + } + +}