From 98123e97754a6db24c57606a0d14f6401b7b792b Mon Sep 17 00:00:00 2001 From: jaymode Date: Mon, 21 Mar 2016 09:30:20 -0400 Subject: [PATCH] Implement available for all StreamInput classes There are some implementation of StreamInput that implement the available method and there are others that do not implement this method. This change makes the available method abstract in the StreamInput class and implements the method where it was not previously implemented. --- .../common/bytes/PagedBytesReference.java | 5 ++++ .../common/io/stream/FilterStreamInput.java | 5 ++++ .../io/stream/InputStreamStreamInput.java | 5 ++++ .../common/io/stream/StreamInput.java | 6 ++-- .../bytes/PagedBytesReferenceTests.java | 11 +++++++- .../common/io/stream/BytesStreamsTests.java | 7 ++++- .../common/io/stream/StreamTests.java | 28 +++++++++++++++++++ 7 files changed, 62 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java b/core/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java index a41b0388af7..16ce91dc38f 100644 --- a/core/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java +++ b/core/src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java @@ -445,5 +445,10 @@ public class PagedBytesReference implements BytesReference { // do nothing } + @Override + public int available() throws IOException { + return length - pos; + } + } } diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java index 5f3bd011dd9..b8132b4e870 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/FilterStreamInput.java @@ -59,6 +59,11 @@ public abstract class FilterStreamInput extends StreamInput { delegate.close(); } + @Override + public int available() throws IOException { + return delegate.available(); + } + @Override public Version getVersion() { return delegate.getVersion(); diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java index e9aa52cf4d0..d786041af49 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/InputStreamStreamInput.java @@ -74,6 +74,11 @@ public class InputStreamStreamInput extends StreamInput { is.close(); } + @Override + public int available() throws IOException { + return is.available(); + } + @Override public int read() throws IOException { return is.read(); diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index aca136a2a9a..9bb10fa677f 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -37,10 +37,8 @@ import org.elasticsearch.common.geo.builders.ShapeBuilder; import org.elasticsearch.common.text.Text; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder; -import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.search.rescore.RescoreBuilder; import org.elasticsearch.search.suggest.SuggestionBuilder; -import org.elasticsearch.search.suggest.completion.context.QueryContext; import org.elasticsearch.search.suggest.phrase.SmoothingModel; import org.elasticsearch.tasks.Task; import org.elasticsearch.search.aggregations.AggregatorBuilder; @@ -68,7 +66,6 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; import java.util.function.Supplier; import static org.elasticsearch.ElasticsearchException.readException; @@ -375,6 +372,9 @@ public abstract class StreamInput extends InputStream { @Override public abstract void close() throws IOException; + @Override + public abstract int available() throws IOException; + public String[] readStringArray() throws IOException { int size = readVInt(); if (size == 0) { diff --git a/core/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java b/core/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java index 95a65f82924..7b34685da7d 100644 --- a/core/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java +++ b/core/src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTests.java @@ -154,7 +154,7 @@ public class PagedBytesReferenceTests extends ESTestCase { } public void testStreamInputBulkReadWithOffset() throws IOException { - int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20)); + final int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20)); BytesReference pbr = getRandomizedPagedBytesReference(length); StreamInput si = pbr.streamInput(); assertNotNull(si); @@ -162,6 +162,7 @@ public class PagedBytesReferenceTests extends ESTestCase { // read a bunch of single bytes one by one int offset = randomIntBetween(1, length / 2); for (int i = 0; i < offset; i++) { + assertEquals(si.available(), length - i); assertEquals(pbr.get(i), si.readByte()); } @@ -176,6 +177,7 @@ public class PagedBytesReferenceTests extends ESTestCase { // bulk-read all si.readFully(targetBytes); assertArrayEquals(pbrBytesWithOffset, targetBytes); + assertEquals(si.available(), 0); } public void testRandomReads() throws IOException { @@ -216,18 +218,22 @@ public class PagedBytesReferenceTests extends ESTestCase { int sliceLength = length - sliceOffset; BytesReference slice = pbr.slice(sliceOffset, sliceLength); StreamInput sliceInput = slice.streamInput(); + assertEquals(sliceInput.available(), sliceLength); // single reads assertEquals(slice.get(0), sliceInput.readByte()); assertEquals(slice.get(1), sliceInput.readByte()); assertEquals(slice.get(2), sliceInput.readByte()); + assertEquals(sliceInput.available(), sliceLength - 3); // reset the slice stream for bulk reading sliceInput.reset(); + assertEquals(sliceInput.available(), sliceLength); // bulk read byte[] sliceBytes = new byte[sliceLength]; sliceInput.readFully(sliceBytes); + assertEquals(sliceInput.available(), 0); // compare slice content with upper half of original byte[] pbrSliceBytes = Arrays.copyOfRange(pbr.toBytes(), sliceOffset, length); @@ -239,11 +245,14 @@ public class PagedBytesReferenceTests extends ESTestCase { assertArrayEquals(sliceBytes, sliceToBytes); sliceInput.reset(); + assertEquals(sliceInput.available(), sliceLength); byte[] buffer = new byte[sliceLength + scaledRandomIntBetween(1, 100)]; int offset = scaledRandomIntBetween(0, Math.max(1, buffer.length - sliceLength - 1)); int read = sliceInput.read(buffer, offset, sliceLength / 2); + assertEquals(sliceInput.available(), sliceLength - read); sliceInput.read(buffer, offset + read, sliceLength); assertArrayEquals(sliceBytes, Arrays.copyOfRange(buffer, offset, offset + sliceLength)); + assertEquals(sliceInput.available(), 0); } public void testWriteToOutputStream() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index 7f232363f73..80bad2e1ecc 100644 --- a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -273,7 +273,9 @@ public class BytesStreamsTests extends ESTestCase { out.writeString("hello"); out.writeString("goodbye"); out.writeGenericValue(BytesRefs.toBytesRef("bytesref")); + final byte[] bytes = out.bytes().toBytes(); StreamInput in = StreamInput.wrap(out.bytes().toBytes()); + assertEquals(in.available(), bytes.length); assertThat(in.readBoolean(), equalTo(false)); assertThat(in.readByte(), equalTo((byte)1)); assertThat(in.readShort(), equalTo((short)-1)); @@ -302,9 +304,12 @@ public class BytesStreamsTests extends ESTestCase { namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null)); TestNamedWriteable namedWriteableIn = new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)); out.writeNamedWriteable(namedWriteableIn); - StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry); + byte[] bytes = out.bytes().toBytes(); + StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry); + assertEquals(in.available(), bytes.length); BaseNamedWriteable namedWriteableOut = in.readNamedWriteable(BaseNamedWriteable.class); assertEquals(namedWriteableOut, namedWriteableIn); + assertEquals(in.available(), 0); } public void testNamedWriteableDuplicates() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/StreamTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/StreamTests.java index b5f26dba8a5..72f933462e0 100644 --- a/core/src/test/java/org/elasticsearch/common/io/stream/StreamTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/StreamTests.java @@ -23,7 +23,10 @@ import org.elasticsearch.common.bytes.ByteBufferBytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.test.ESTestCase; +import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; @@ -93,4 +96,29 @@ public class StreamTests extends ESTestCase { index++; } } + + public void testFilterStreamInputDelegatesAvailable() throws IOException { + final int length = randomIntBetween(1, 1024); + StreamInput delegate = StreamInput.wrap(new byte[length]); + + FilterStreamInput filterInputStream = new FilterStreamInput(delegate) {}; + assertEquals(filterInputStream.available(), length); + + // read some bytes + final int bytesToRead = randomIntBetween(1, length); + filterInputStream.readBytes(new byte[bytesToRead], 0, bytesToRead); + assertEquals(filterInputStream.available(), length - bytesToRead); + } + + public void testInputStreamStreamInputDelegatesAvailable() throws IOException { + final int length = randomIntBetween(1, 1024); + ByteArrayInputStream is = new ByteArrayInputStream(new byte[length]); + InputStreamStreamInput streamInput = new InputStreamStreamInput(is); + assertEquals(streamInput.available(), length); + + // read some bytes + final int bytesToRead = randomIntBetween(1, length); + streamInput.readBytes(new byte[bytesToRead], 0, bytesToRead); + assertEquals(streamInput.available(), length - bytesToRead); + } }