Restore streamInput() performance over PagedBytesReference.

Closes #5589
This commit is contained in:
Holger Hoffstätte 2014-03-28 08:30:24 +01:00
parent e78bbbf3ec
commit 0c1b9a6670
2 changed files with 50 additions and 12 deletions

View File

@ -381,12 +381,14 @@ public final class PagedBytesReference implements BytesReference {
private static class PagedBytesReferenceStreamInput extends StreamInput {
private final ByteArray bytearray;
private final BytesRef ref;
private final int offset;
private final int length;
private int pos;
public PagedBytesReferenceStreamInput(ByteArray bytearray, int offset, int length) {
this.bytearray = bytearray;
this.ref = new BytesRef();
this.offset = offset;
this.length = length;
this.pos = 0;
@ -420,7 +422,7 @@ public final class PagedBytesReference implements BytesReference {
}
@Override
public int read(byte[] b, int bOffset, int len) throws IOException {
public int read(final byte[] b, final int bOffset, final int len) throws IOException {
if (len == 0) {
return 0;
}
@ -430,17 +432,25 @@ public final class PagedBytesReference implements BytesReference {
}
// we need to stop at the end
len = Math.min(length, len);
int todo = Math.min(len, length);
// ByteArray.get(BytesRef) does not work since it flips the
// ref's byte[] pointer, so for now we copy byte-by-byte
// current offset into the underlying ByteArray
long bytearrayOffset = offset + pos;
// bytes already copied
int written = 0;
while (written < len) {
b[bOffset + written] = bytearray.get(offset + written);
written++;
while (written < todo) {
long pagefragment = PAGE_SIZE - (bytearrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
int bulksize = (int)Math.min(pagefragment, todo - written); // we cannot copy more than a page fragment
boolean copied = bytearray.get(bytearrayOffset, bulksize, ref); // get the fragment
assert (copied == false); // we should never ever get back a materialized byte[]
System.arraycopy(ref.bytes, ref.offset, b, bOffset + written, bulksize); // copy fragment contents
written += bulksize; // count how much we copied
bytearrayOffset += bulksize; // advance ByteArray index
}
pos += written;
pos += written; // finally advance our stream position
return written;
}

View File

@ -98,7 +98,7 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase {
}
public void testStreamInput() throws IOException {
int length = randomIntBetween(10, PAGE_SIZE * 3);
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput si = pbr.streamInput();
assertNotNull(si);
@ -107,6 +107,8 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase {
assertEquals(pbr.get(0), si.readByte());
assertEquals(pbr.get(1), si.readByte());
assertEquals(pbr.get(2), si.readByte());
// reset the stream for bulk reading
si.reset();
// buffer for bulk reads
@ -151,10 +153,34 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase {
}
}
public void testSliceStreamInput() throws IOException {
int length = randomIntBetween(10, PAGE_SIZE * 3);
public void testStreamInputBulkReadWithOffset() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput si = pbr.streamInput();
assertNotNull(si);
// read a bunch of single bytes one by one
int offset = randomIntBetween(1, length / 2);
for (int i = 0; i < offset ; i++) {
assertEquals(pbr.get(i), si.readByte());
}
// now do NOT reset the stream - keep the stream's offset!
// buffer to compare remaining bytes against bulk read
byte[] pbrBytesWithOffset = Arrays.copyOfRange(pbr.toBytes(), offset, length);
// randomized target buffer to ensure no stale slots
byte[] targetBytes = new byte[pbrBytesWithOffset.length];
getRandom().nextBytes(targetBytes);
// bulk-read all
si.readFully(targetBytes);
assertArrayEquals(pbrBytesWithOffset, targetBytes);
}
public void testSliceStreamInput() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
// test stream input over slice (upper half of original)
int sliceOffset = randomIntBetween(1, length / 2);
@ -166,7 +192,9 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase {
assertEquals(slice.get(0), sliceInput.readByte());
assertEquals(slice.get(1), sliceInput.readByte());
assertEquals(slice.get(2), sliceInput.readByte());
si.reset();
// reset the slice stream for bulk reading
sliceInput.reset();
// bulk read
byte[] sliceBytes = new byte[sliceLength];