Merge pull request #17218 from jaymode/streaminput_availble

Implement available for all StreamInput classes
This commit is contained in:
Jay Modi 2016-03-23 09:15:24 -04:00
commit ae1a68c10a
7 changed files with 62 additions and 5 deletions

View File

@ -445,5 +445,10 @@ public class PagedBytesReference implements BytesReference {
// do nothing
}
@Override
public int available() throws IOException {
return length - pos;
}
}
}

View File

@ -59,6 +59,11 @@ public abstract class FilterStreamInput extends StreamInput {
delegate.close();
}
@Override
public int available() throws IOException {
return delegate.available();
}
@Override
public Version getVersion() {
return delegate.getVersion();

View File

@ -74,6 +74,11 @@ public class InputStreamStreamInput extends StreamInput {
is.close();
}
@Override
public int available() throws IOException {
return is.available();
}
@Override
public int read() throws IOException {
return is.read();

View File

@ -37,10 +37,8 @@ import org.elasticsearch.common.geo.builders.ShapeBuilder;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.search.rescore.RescoreBuilder;
import org.elasticsearch.search.suggest.SuggestionBuilder;
import org.elasticsearch.search.suggest.completion.context.QueryContext;
import org.elasticsearch.search.suggest.phrase.SmoothingModel;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.search.aggregations.AggregatorBuilder;
@ -68,7 +66,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.elasticsearch.ElasticsearchException.readException;
@ -375,6 +372,9 @@ public abstract class StreamInput extends InputStream {
@Override
public abstract void close() throws IOException;
@Override
public abstract int available() throws IOException;
public String[] readStringArray() throws IOException {
int size = readVInt();
if (size == 0) {

View File

@ -154,7 +154,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
}
public void testStreamInputBulkReadWithOffset() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
final int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput si = pbr.streamInput();
assertNotNull(si);
@ -162,6 +162,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
// read a bunch of single bytes one by one
int offset = randomIntBetween(1, length / 2);
for (int i = 0; i < offset; i++) {
assertEquals(si.available(), length - i);
assertEquals(pbr.get(i), si.readByte());
}
@ -176,6 +177,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
// bulk-read all
si.readFully(targetBytes);
assertArrayEquals(pbrBytesWithOffset, targetBytes);
assertEquals(si.available(), 0);
}
public void testRandomReads() throws IOException {
@ -216,18 +218,22 @@ public class PagedBytesReferenceTests extends ESTestCase {
int sliceLength = length - sliceOffset;
BytesReference slice = pbr.slice(sliceOffset, sliceLength);
StreamInput sliceInput = slice.streamInput();
assertEquals(sliceInput.available(), sliceLength);
// single reads
assertEquals(slice.get(0), sliceInput.readByte());
assertEquals(slice.get(1), sliceInput.readByte());
assertEquals(slice.get(2), sliceInput.readByte());
assertEquals(sliceInput.available(), sliceLength - 3);
// reset the slice stream for bulk reading
sliceInput.reset();
assertEquals(sliceInput.available(), sliceLength);
// bulk read
byte[] sliceBytes = new byte[sliceLength];
sliceInput.readFully(sliceBytes);
assertEquals(sliceInput.available(), 0);
// compare slice content with upper half of original
byte[] pbrSliceBytes = Arrays.copyOfRange(pbr.toBytes(), sliceOffset, length);
@ -239,11 +245,14 @@ public class PagedBytesReferenceTests extends ESTestCase {
assertArrayEquals(sliceBytes, sliceToBytes);
sliceInput.reset();
assertEquals(sliceInput.available(), sliceLength);
byte[] buffer = new byte[sliceLength + scaledRandomIntBetween(1, 100)];
int offset = scaledRandomIntBetween(0, Math.max(1, buffer.length - sliceLength - 1));
int read = sliceInput.read(buffer, offset, sliceLength / 2);
assertEquals(sliceInput.available(), sliceLength - read);
sliceInput.read(buffer, offset + read, sliceLength);
assertArrayEquals(sliceBytes, Arrays.copyOfRange(buffer, offset, offset + sliceLength));
assertEquals(sliceInput.available(), 0);
}
public void testWriteToOutputStream() throws IOException {

View File

@ -273,7 +273,9 @@ public class BytesStreamsTests extends ESTestCase {
out.writeString("hello");
out.writeString("goodbye");
out.writeGenericValue(BytesRefs.toBytesRef("bytesref"));
final byte[] bytes = out.bytes().toBytes();
StreamInput in = StreamInput.wrap(out.bytes().toBytes());
assertEquals(in.available(), bytes.length);
assertThat(in.readBoolean(), equalTo(false));
assertThat(in.readByte(), equalTo((byte)1));
assertThat(in.readShort(), equalTo((short)-1));
@ -302,9 +304,12 @@ public class BytesStreamsTests extends ESTestCase {
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
TestNamedWriteable namedWriteableIn = new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
out.writeNamedWriteable(namedWriteableIn);
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry);
byte[] bytes = out.bytes().toBytes();
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry);
assertEquals(in.available(), bytes.length);
BaseNamedWriteable namedWriteableOut = in.readNamedWriteable(BaseNamedWriteable.class);
assertEquals(namedWriteableOut, namedWriteableIn);
assertEquals(in.available(), 0);
}
public void testNamedWriteableDuplicates() throws IOException {

View File

@ -23,7 +23,10 @@ import org.elasticsearch.common.bytes.ByteBufferBytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -93,4 +96,29 @@ public class StreamTests extends ESTestCase {
index++;
}
}
public void testFilterStreamInputDelegatesAvailable() throws IOException {
final int length = randomIntBetween(1, 1024);
StreamInput delegate = StreamInput.wrap(new byte[length]);
FilterStreamInput filterInputStream = new FilterStreamInput(delegate) {};
assertEquals(filterInputStream.available(), length);
// read some bytes
final int bytesToRead = randomIntBetween(1, length);
filterInputStream.readBytes(new byte[bytesToRead], 0, bytesToRead);
assertEquals(filterInputStream.available(), length - bytesToRead);
}
public void testInputStreamStreamInputDelegatesAvailable() throws IOException {
final int length = randomIntBetween(1, 1024);
ByteArrayInputStream is = new ByteArrayInputStream(new byte[length]);
InputStreamStreamInput streamInput = new InputStreamStreamInput(is);
assertEquals(streamInput.available(), length);
// read some bytes
final int bytesToRead = randomIntBetween(1, length);
streamInput.readBytes(new byte[bytesToRead], 0, bytesToRead);
assertEquals(streamInput.available(), length - bytesToRead);
}
}