diff --git a/core/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java b/core/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java index a41b0388af7..16ce91dc38f 100644 --- a/core/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java +++ b/core/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java @@ -445,5 +445,10 @@ public class PagedBytesReference implements BytesReference { // do nothing } + @Override + public int available() throws IOException { + return length - pos; + } + } } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java index 5f3bd011dd9..b8132b4e870 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java @@ -59,6 +59,11 @@ public abstract class FilterStreamInput extends StreamInput { delegate.close(); } + @Override + public int available() throws IOException { + return delegate.available(); + } + @Override public Version getVersion() { return delegate.getVersion(); diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java index e9aa52cf4d0..d786041af49 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java @@ -74,6 +74,11 @@ public class InputStreamStreamInput extends StreamInput { is.close(); } + @Override + public int available() throws IOException { + return is.available(); + } + @Override public int read() throws IOException { return is.read(); diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 1fa96a0f2c2..c5709db5363 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -373,6 +373,9 @@ public abstract class StreamInput extends InputStream { @Override public abstract void close() throws IOException; + @Override + public abstract int available() throws IOException; + public String[] readStringArray() throws IOException { int size = readVInt(); if (size == 0) { @@ -685,21 +688,21 @@ public abstract class StreamInput extends InputStream { /** * Reads a {@link AggregatorBuilder} from the current stream */ - public AggregatorBuilder readAggregatorFactory() throws IOException { + public AggregatorBuilder readAggregatorFactory() throws IOException { return readNamedWriteable(AggregatorBuilder.class); } /** * Reads a {@link PipelineAggregatorBuilder} from the current stream */ - public PipelineAggregatorBuilder readPipelineAggregatorFactory() throws IOException { + public PipelineAggregatorBuilder readPipelineAggregatorFactory() throws IOException { return readNamedWriteable(PipelineAggregatorBuilder.class); } /** * Reads a {@link QueryBuilder} from the current stream */ - public QueryBuilder readQuery() throws IOException { + public QueryBuilder readQuery() throws IOException { return readNamedWriteable(QueryBuilder.class); } diff --git a/core/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java b/core/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java index 95a65f82924..7b34685da7d 100644 --- a/core/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java +++ b/core/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java @@ -154,7 +154,7 @@ public class PagedBytesReferenceTests extends ESTestCase { } public void testStreamInputBulkReadWithOffset() throws IOException { - int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20)); + final int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20)); BytesReference pbr = getRandomizedPagedBytesReference(length); StreamInput si = pbr.streamInput(); assertNotNull(si); @@ -162,6 +162,7 @@ public class PagedBytesReferenceTests extends ESTestCase { // read a bunch of single bytes one by one int offset = randomIntBetween(1, length / 2); for (int i = 0; i < offset; i++) { + assertEquals(si.available(), length - i); assertEquals(pbr.get(i), si.readByte()); } @@ -176,6 +177,7 @@ public class PagedBytesReferenceTests extends ESTestCase { // bulk-read all si.readFully(targetBytes); assertArrayEquals(pbrBytesWithOffset, targetBytes); + assertEquals(si.available(), 0); } public void testRandomReads() throws IOException { @@ -216,18 +218,22 @@ public class PagedBytesReferenceTests extends ESTestCase { int sliceLength = length - sliceOffset; BytesReference slice = pbr.slice(sliceOffset, sliceLength); StreamInput sliceInput = slice.streamInput(); + assertEquals(sliceInput.available(), sliceLength); // single reads assertEquals(slice.get(0), sliceInput.readByte()); assertEquals(slice.get(1), sliceInput.readByte()); assertEquals(slice.get(2), sliceInput.readByte()); + assertEquals(sliceInput.available(), sliceLength - 3); // reset the slice stream for bulk reading sliceInput.reset(); + assertEquals(sliceInput.available(), sliceLength); // bulk read byte[] sliceBytes = new byte[sliceLength]; sliceInput.readFully(sliceBytes); + assertEquals(sliceInput.available(), 0); // compare slice content with upper half of original byte[] pbrSliceBytes = Arrays.copyOfRange(pbr.toBytes(), sliceOffset, length); @@ -239,11 +245,14 @@ public class PagedBytesReferenceTests extends ESTestCase { assertArrayEquals(sliceBytes, sliceToBytes); sliceInput.reset(); + assertEquals(sliceInput.available(), sliceLength); byte[] buffer = new byte[sliceLength + scaledRandomIntBetween(1, 100)]; int offset = scaledRandomIntBetween(0, Math.max(1, buffer.length - sliceLength - 1)); int read = sliceInput.read(buffer, offset, sliceLength / 2); + assertEquals(sliceInput.available(), sliceLength - read); sliceInput.read(buffer, offset + read, sliceLength); assertArrayEquals(sliceBytes, Arrays.copyOfRange(buffer, offset, offset + sliceLength)); + assertEquals(sliceInput.available(), 0); } public void testWriteToOutputStream() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index 7f232363f73..80bad2e1ecc 100644 --- a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -273,7 +273,9 @@ public class BytesStreamsTests extends ESTestCase { out.writeString("hello"); out.writeString("goodbye"); out.writeGenericValue(BytesRefs.toBytesRef("bytesref")); + final byte[] bytes = out.bytes().toBytes(); StreamInput in = StreamInput.wrap(out.bytes().toBytes()); + assertEquals(in.available(), bytes.length); assertThat(in.readBoolean(), equalTo(false)); assertThat(in.readByte(), equalTo((byte)1)); assertThat(in.readShort(), equalTo((short)-1)); @@ -302,9 +304,12 @@ public class BytesStreamsTests extends ESTestCase { namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null)); TestNamedWriteable namedWriteableIn = new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)); out.writeNamedWriteable(namedWriteableIn); - StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry); + byte[] bytes = out.bytes().toBytes(); + StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry); + assertEquals(in.available(), bytes.length); BaseNamedWriteable namedWriteableOut = in.readNamedWriteable(BaseNamedWriteable.class); assertEquals(namedWriteableOut, namedWriteableIn); + assertEquals(in.available(), 0); } public void testNamedWriteableDuplicates() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/StreamTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/StreamTests.java index b5f26dba8a5..72f933462e0 100644 --- a/core/src/test/java/org/elasticsearch/common/io/stream/StreamTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/StreamTests.java @@ -23,7 +23,10 @@ import org.elasticsearch.common.bytes.ByteBufferBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.test.ESTestCase; +import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -93,4 +96,29 @@ public class StreamTests extends ESTestCase { index++; } } + + public void testFilterStreamInputDelegatesAvailable() throws IOException { + final int length = randomIntBetween(1, 1024); + StreamInput delegate = StreamInput.wrap(new byte[length]); + + FilterStreamInput filterInputStream = new FilterStreamInput(delegate) {}; + assertEquals(filterInputStream.available(), length); + + // read some bytes + final int bytesToRead = randomIntBetween(1, length); + filterInputStream.readBytes(new byte[bytesToRead], 0, bytesToRead); + assertEquals(filterInputStream.available(), length - bytesToRead); + } + + public void testInputStreamStreamInputDelegatesAvailable() throws IOException { + final int length = randomIntBetween(1, 1024); + ByteArrayInputStream is = new ByteArrayInputStream(new byte[length]); + InputStreamStreamInput streamInput = new InputStreamStreamInput(is); + assertEquals(streamInput.available(), length); + + // read some bytes + final int bytesToRead = randomIntBetween(1, length); + streamInput.readBytes(new byte[bytesToRead], 0, bytesToRead); + assertEquals(streamInput.available(), length - bytesToRead); + } }