Take stream position into account when calculating remaining length
Currently `PagedBytesReferenceStreamInput#read(byte[],int,int)` ignores the current pos and that causes to read past EOF Closes #5667
This commit is contained in:
parent
8f324d50b2
commit
cd552e7413
|
@ -431,27 +431,26 @@ public final class PagedBytesReference implements BytesReference {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// we need to stop at the end
|
||||
int todo = Math.min(len, length);
|
||||
final int numBytesToCopy = Math.min(len, length - pos); // copy the full lenth or the remaining part
|
||||
|
||||
// current offset into the underlying ByteArray
|
||||
long bytearrayOffset = offset + pos;
|
||||
long byteArrayOffset = offset + pos;
|
||||
|
||||
// bytes already copied
|
||||
int written = 0;
|
||||
int copiedBytes = 0;
|
||||
|
||||
while (written < todo) {
|
||||
long pagefragment = PAGE_SIZE - (bytearrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
|
||||
int bulksize = (int)Math.min(pagefragment, todo - written); // we cannot copy more than a page fragment
|
||||
boolean copied = bytearray.get(bytearrayOffset, bulksize, ref); // get the fragment
|
||||
while (copiedBytes < numBytesToCopy) {
|
||||
long pageFragment = PAGE_SIZE - (byteArrayOffset % PAGE_SIZE); // how much can we read until hitting N*PAGE_SIZE?
|
||||
int bulkSize = (int)Math.min(pageFragment, numBytesToCopy - copiedBytes); // we cannot copy more than a page fragment
|
||||
boolean copied = bytearray.get(byteArrayOffset, bulkSize, ref); // get the fragment
|
||||
assert (copied == false); // we should never ever get back a materialized byte[]
|
||||
System.arraycopy(ref.bytes, ref.offset, b, bOffset + written, bulksize); // copy fragment contents
|
||||
written += bulksize; // count how much we copied
|
||||
bytearrayOffset += bulksize; // advance ByteArray index
|
||||
System.arraycopy(ref.bytes, ref.offset, b, bOffset + copiedBytes, bulkSize); // copy fragment contents
|
||||
copiedBytes += bulkSize; // count how much we copied
|
||||
byteArrayOffset += bulkSize; // advance ByteArray index
|
||||
}
|
||||
|
||||
pos += written; // finally advance our stream position
|
||||
return written;
|
||||
pos += copiedBytes; // finally advance our stream position
|
||||
return copiedBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -178,6 +178,34 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase {
|
|||
assertArrayEquals(pbrBytesWithOffset, targetBytes);
|
||||
}
|
||||
|
||||
public void testRandomReads() throws IOException {
|
||||
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
StreamInput streamInput = pbr.streamInput();
|
||||
BytesRef target = new BytesRef();
|
||||
while(target.length < pbr.length()) {
|
||||
switch (randomIntBetween(0, 10)) {
|
||||
case 6:
|
||||
case 5:
|
||||
target.append(new BytesRef(new byte[] {streamInput.readByte()}));
|
||||
break;
|
||||
case 4:
|
||||
case 3:
|
||||
BytesRef bytesRef = streamInput.readBytesRef(scaledRandomIntBetween(1, pbr.length() - target.length));
|
||||
target.append(bytesRef);
|
||||
break;
|
||||
default:
|
||||
byte[] buffer = new byte[scaledRandomIntBetween(1, pbr.length() - target.length)];
|
||||
int offset = scaledRandomIntBetween(0, buffer.length - 1);
|
||||
int read = streamInput.read(buffer, offset, buffer.length - offset);
|
||||
target.append(new BytesRef(buffer, offset, read));
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertEquals(pbr.length(), target.length);
|
||||
assertArrayEquals(pbr.toBytes(), Arrays.copyOfRange(target.bytes, target.offset, target.length));
|
||||
}
|
||||
|
||||
public void testSliceStreamInput() throws IOException {
|
||||
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
|
||||
BytesReference pbr = getRandomizedPagedBytesReference(length);
|
||||
|
@ -208,6 +236,13 @@ public class PagedBytesReferenceTest extends ElasticsearchTestCase {
|
|||
byte[] sliceToBytes = slice.toBytes();
|
||||
assertEquals(sliceBytes.length, sliceToBytes.length);
|
||||
assertArrayEquals(sliceBytes, sliceToBytes);
|
||||
|
||||
sliceInput.reset();
|
||||
byte[] buffer = new byte[sliceLength + scaledRandomIntBetween(1, 100)];
|
||||
int offset = scaledRandomIntBetween(0, Math.max(1, buffer.length - sliceLength - 1));
|
||||
int read = sliceInput.read(buffer, offset, sliceLength / 2);
|
||||
sliceInput.read(buffer, offset + read, sliceLength);
|
||||
assertArrayEquals(sliceBytes, Arrays.copyOfRange(buffer, offset, offset + sliceLength));
|
||||
}
|
||||
|
||||
public void testWriteTo() throws IOException {
|
||||
|
|
|
@ -23,10 +23,7 @@ import com.google.common.collect.Lists;
|
|||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.io.FastCharArrayWriter;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentGenerator;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
import org.elasticsearch.test.ElasticsearchTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -201,4 +198,59 @@ public class XContentBuilderTests extends ElasticsearchTestCase {
|
|||
builder.map(map);
|
||||
assertThat(builder.string(), equalTo("{\"calendar\":\"" + expectedCalendar + "\"}"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCopyCurrentStructure() throws Exception {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
|
||||
builder.startObject()
|
||||
.field("test", "test field")
|
||||
.startObject("filter")
|
||||
.startObject("terms");
|
||||
|
||||
// up to 20k random terms
|
||||
int numTerms = randomInt(20000) + 1;
|
||||
List<String> terms = new ArrayList<>(numTerms);
|
||||
for (int i = 0; i < numTerms; i++) {
|
||||
terms.add("test" + i);
|
||||
}
|
||||
|
||||
builder.field("fakefield", terms).endObject().endObject().endObject();
|
||||
|
||||
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(builder.bytes());
|
||||
|
||||
XContentBuilder filterBuilder = null;
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if ("test".equals(currentFieldName)) {
|
||||
assertThat(parser.text(), equalTo("test field"));
|
||||
}
|
||||
} else if (token == XContentParser.Token.START_OBJECT) {
|
||||
if ("filter".equals(currentFieldName)) {
|
||||
filterBuilder = XContentFactory.contentBuilder(parser.contentType());
|
||||
filterBuilder.copyCurrentStructure(parser);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assertNotNull(filterBuilder);
|
||||
parser = XContentFactory.xContent(XContentType.JSON).createParser(filterBuilder.bytes());
|
||||
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
|
||||
assertThat(parser.nextToken(), equalTo(XContentParser.Token.FIELD_NAME));
|
||||
assertThat(parser.currentName(), equalTo("terms"));
|
||||
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_OBJECT));
|
||||
assertThat(parser.nextToken(), equalTo(XContentParser.Token.FIELD_NAME));
|
||||
assertThat(parser.currentName(), equalTo("fakefield"));
|
||||
assertThat(parser.nextToken(), equalTo(XContentParser.Token.START_ARRAY));
|
||||
int i = 0;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
|
||||
assertThat(parser.text(), equalTo(terms.get(i++)));
|
||||
}
|
||||
|
||||
assertThat(i, equalTo(terms.size()));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue