Implement available for all StreamInput classes

There are some implementation of StreamInput that implement the available method
and there are others that do not implement this method. This change makes the
available method abstract in the StreamInput class and implements the method where
it was not previously implemented.
This commit is contained in:
jaymode 2016-03-21 09:30:20 -04:00
parent 032678f0c3
commit 98123e9775
7 changed files with 62 additions and 5 deletions

View File

@ -445,5 +445,10 @@ public class PagedBytesReference implements BytesReference {
// do nothing
}
@Override
public int available() throws IOException {
return length - pos;
}
}
}

View File

@ -59,6 +59,11 @@ public abstract class FilterStreamInput extends StreamInput {
delegate.close();
}
@Override
public int available() throws IOException {
return delegate.available();
}
@Override
public Version getVersion() {
return delegate.getVersion();

View File

@ -74,6 +74,11 @@ public class InputStreamStreamInput extends StreamInput {
is.close();
}
@Override
public int available() throws IOException {
return is.available();
}
@Override
public int read() throws IOException {
return is.read();

View File

@ -37,10 +37,8 @@ import org.elasticsearch.common.geo.builders.ShapeBuilder;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.ingest.IngestStats;
import org.elasticsearch.search.rescore.RescoreBuilder;
import org.elasticsearch.search.suggest.SuggestionBuilder;
import org.elasticsearch.search.suggest.completion.context.QueryContext;
import org.elasticsearch.search.suggest.phrase.SmoothingModel;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.search.aggregations.AggregatorBuilder;
@ -68,7 +66,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.elasticsearch.ElasticsearchException.readException;
@ -375,6 +372,9 @@ public abstract class StreamInput extends InputStream {
@Override
public abstract void close() throws IOException;
@Override
public abstract int available() throws IOException;
public String[] readStringArray() throws IOException {
int size = readVInt();
if (size == 0) {

View File

@ -154,7 +154,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
}
public void testStreamInputBulkReadWithOffset() throws IOException {
int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
final int length = randomIntBetween(10, scaledRandomIntBetween(PAGE_SIZE * 2, PAGE_SIZE * 20));
BytesReference pbr = getRandomizedPagedBytesReference(length);
StreamInput si = pbr.streamInput();
assertNotNull(si);
@ -162,6 +162,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
// read a bunch of single bytes one by one
int offset = randomIntBetween(1, length / 2);
for (int i = 0; i < offset; i++) {
assertEquals(si.available(), length - i);
assertEquals(pbr.get(i), si.readByte());
}
@ -176,6 +177,7 @@ public class PagedBytesReferenceTests extends ESTestCase {
// bulk-read all
si.readFully(targetBytes);
assertArrayEquals(pbrBytesWithOffset, targetBytes);
assertEquals(si.available(), 0);
}
public void testRandomReads() throws IOException {
@ -216,18 +218,22 @@ public class PagedBytesReferenceTests extends ESTestCase {
int sliceLength = length - sliceOffset;
BytesReference slice = pbr.slice(sliceOffset, sliceLength);
StreamInput sliceInput = slice.streamInput();
assertEquals(sliceInput.available(), sliceLength);
// single reads
assertEquals(slice.get(0), sliceInput.readByte());
assertEquals(slice.get(1), sliceInput.readByte());
assertEquals(slice.get(2), sliceInput.readByte());
assertEquals(sliceInput.available(), sliceLength - 3);
// reset the slice stream for bulk reading
sliceInput.reset();
assertEquals(sliceInput.available(), sliceLength);
// bulk read
byte[] sliceBytes = new byte[sliceLength];
sliceInput.readFully(sliceBytes);
assertEquals(sliceInput.available(), 0);
// compare slice content with upper half of original
byte[] pbrSliceBytes = Arrays.copyOfRange(pbr.toBytes(), sliceOffset, length);
@ -239,11 +245,14 @@ public class PagedBytesReferenceTests extends ESTestCase {
assertArrayEquals(sliceBytes, sliceToBytes);
sliceInput.reset();
assertEquals(sliceInput.available(), sliceLength);
byte[] buffer = new byte[sliceLength + scaledRandomIntBetween(1, 100)];
int offset = scaledRandomIntBetween(0, Math.max(1, buffer.length - sliceLength - 1));
int read = sliceInput.read(buffer, offset, sliceLength / 2);
assertEquals(sliceInput.available(), sliceLength - read);
sliceInput.read(buffer, offset + read, sliceLength);
assertArrayEquals(sliceBytes, Arrays.copyOfRange(buffer, offset, offset + sliceLength));
assertEquals(sliceInput.available(), 0);
}
public void testWriteToOutputStream() throws IOException {

View File

@ -273,7 +273,9 @@ public class BytesStreamsTests extends ESTestCase {
out.writeString("hello");
out.writeString("goodbye");
out.writeGenericValue(BytesRefs.toBytesRef("bytesref"));
final byte[] bytes = out.bytes().toBytes();
StreamInput in = StreamInput.wrap(out.bytes().toBytes());
assertEquals(in.available(), bytes.length);
assertThat(in.readBoolean(), equalTo(false));
assertThat(in.readByte(), equalTo((byte)1));
assertThat(in.readShort(), equalTo((short)-1));
@ -302,9 +304,12 @@ public class BytesStreamsTests extends ESTestCase {
namedWriteableRegistry.registerPrototype(BaseNamedWriteable.class, new TestNamedWriteable(null, null));
TestNamedWriteable namedWriteableIn = new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10));
out.writeNamedWriteable(namedWriteableIn);
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(out.bytes().toBytes()), namedWriteableRegistry);
byte[] bytes = out.bytes().toBytes();
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry);
assertEquals(in.available(), bytes.length);
BaseNamedWriteable namedWriteableOut = in.readNamedWriteable(BaseNamedWriteable.class);
assertEquals(namedWriteableOut, namedWriteableIn);
assertEquals(in.available(), 0);
}
public void testNamedWriteableDuplicates() throws IOException {

View File

@ -23,7 +23,10 @@ import org.elasticsearch.common.bytes.ByteBufferBytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayInputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@ -93,4 +96,29 @@ public class StreamTests extends ESTestCase {
index++;
}
}
public void testFilterStreamInputDelegatesAvailable() throws IOException {
final int length = randomIntBetween(1, 1024);
StreamInput delegate = StreamInput.wrap(new byte[length]);
FilterStreamInput filterInputStream = new FilterStreamInput(delegate) {};
assertEquals(filterInputStream.available(), length);
// read some bytes
final int bytesToRead = randomIntBetween(1, length);
filterInputStream.readBytes(new byte[bytesToRead], 0, bytesToRead);
assertEquals(filterInputStream.available(), length - bytesToRead);
}
public void testInputStreamStreamInputDelegatesAvailable() throws IOException {
final int length = randomIntBetween(1, 1024);
ByteArrayInputStream is = new ByteArrayInputStream(new byte[length]);
InputStreamStreamInput streamInput = new InputStreamStreamInput(is);
assertEquals(streamInput.available(), length);
// read some bytes
final int bytesToRead = randomIntBetween(1, length);
streamInput.readBytes(new byte[bytesToRead], 0, bytesToRead);
assertEquals(streamInput.available(), length - bytesToRead);
}
}