diff --git a/src/main/java/org/elasticsearch/common/io/FileChannelInputStream.java b/src/main/java/org/elasticsearch/common/io/FileChannelInputStream.java index e075c00f60a..abfa5af6c4f 100644 --- a/src/main/java/org/elasticsearch/common/io/FileChannelInputStream.java +++ b/src/main/java/org/elasticsearch/common/io/FileChannelInputStream.java @@ -39,6 +39,8 @@ public class FileChannelInputStream extends InputStream { private byte[] bs = null; // Invoker's previous array private byte[] b1 = null; + private long markPosition; + /** * @param channel The channel to read from * @param position The position to start reading from @@ -47,6 +49,7 @@ public class FileChannelInputStream extends InputStream { public FileChannelInputStream(FileChannel channel, long position, long length) { this.channel = channel; this.position = position; + this.markPosition = position; this.length = position + length; // easier to work with total length } @@ -88,4 +91,19 @@ public class FileChannelInputStream extends InputStream { } return read; } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void mark(int readlimit) { + this.markPosition = position; + } + + @Override + public void reset() throws IOException { + position = markPosition; + } } diff --git a/src/main/java/org/elasticsearch/common/lucene/store/InputStreamIndexInput.java b/src/main/java/org/elasticsearch/common/lucene/store/InputStreamIndexInput.java index 09c09d75211..cb57f3f71dd 100644 --- a/src/main/java/org/elasticsearch/common/lucene/store/InputStreamIndexInput.java +++ b/src/main/java/org/elasticsearch/common/lucene/store/InputStreamIndexInput.java @@ -37,6 +37,9 @@ public class InputStreamIndexInput extends InputStream { private long counter = 0; + private long markPointer; + private long markCounter; + public InputStreamIndexInput(IndexInput indexInput, long limit) { this.indexInput = indexInput; this.limit = limit; @@ -82,4 +85,21 @@ public class InputStreamIndexInput extends InputStream { } return (indexInput.getFilePointer() < indexInput.length()) ? (indexInput.readByte() & 0xff) : -1; } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public synchronized void mark(int readlimit) { + markPointer = indexInput.getFilePointer(); + markCounter = counter; + } + + @Override + public synchronized void reset() throws IOException { + indexInput.seek(markPointer); + counter = markCounter; + } } diff --git a/src/test/java/org/elasticsearch/test/unit/common/lucene/store/InputStreamIndexInputTests.java b/src/test/java/org/elasticsearch/test/unit/common/lucene/store/InputStreamIndexInputTests.java index e88b542e141..68e8d6e4dd3 100644 --- a/src/test/java/org/elasticsearch/test/unit/common/lucene/store/InputStreamIndexInputTests.java +++ b/src/test/java/org/elasticsearch/test/unit/common/lucene/store/InputStreamIndexInputTests.java @@ -236,4 +236,30 @@ public class InputStreamIndexInputTests { assertThat(is.actualSizeToRead(), equalTo(0l)); assertThat(is.read(read), equalTo(-1)); } + + @Test + public void testMarkRest() throws Exception { + RAMDirectory dir = new RAMDirectory(); + IndexOutput output = dir.createOutput("test"); + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 1); + } + for (int i = 0; i < 3; i++) { + output.writeByte((byte) 2); + } + + output.close(); + + IndexInput input = dir.openInput("test"); + InputStreamIndexInput is = new InputStreamIndexInput(input, 4); + assertThat(is.markSupported(), equalTo(true)); + assertThat(is.read(), equalTo(1)); + assertThat(is.read(), equalTo(1)); + is.mark(0); + assertThat(is.read(), equalTo(1)); + assertThat(is.read(), equalTo(2)); + is.reset(); + assertThat(is.read(), equalTo(1)); + assertThat(is.read(), equalTo(2)); + } }