Have streams provided to gateway (shared one) allow marking, closes #1803.

This commit is contained in:
Shay Banon 2012-03-22 12:20:00 +02:00
parent 211fad7a17
commit 348ed11450
3 changed files with 64 additions and 0 deletions

View File

@ -39,6 +39,8 @@ public class FileChannelInputStream extends InputStream {
private byte[] bs = null; // Invoker's previous array
private byte[] b1 = null;
private long markPosition;
/**
* @param channel The channel to read from
* @param position The position to start reading from
@ -47,6 +49,7 @@ public class FileChannelInputStream extends InputStream {
public FileChannelInputStream(FileChannel channel, long position, long length) {
this.channel = channel;
this.position = position;
this.markPosition = position;
this.length = position + length; // easier to work with total length
}
@ -88,4 +91,19 @@ public class FileChannelInputStream extends InputStream {
}
return read;
}
@Override
public boolean markSupported() {
return true;
}
@Override
public void mark(int readlimit) {
this.markPosition = position;
}
@Override
public void reset() throws IOException {
position = markPosition;
}
}

View File

@ -37,6 +37,9 @@ public class InputStreamIndexInput extends InputStream {
private long counter = 0;
private long markPointer;
private long markCounter;
public InputStreamIndexInput(IndexInput indexInput, long limit) {
this.indexInput = indexInput;
this.limit = limit;
@ -82,4 +85,21 @@ public class InputStreamIndexInput extends InputStream {
}
return (indexInput.getFilePointer() < indexInput.length()) ? (indexInput.readByte() & 0xff) : -1;
}
@Override
public boolean markSupported() {
return true;
}
@Override
public synchronized void mark(int readlimit) {
markPointer = indexInput.getFilePointer();
markCounter = counter;
}
@Override
public synchronized void reset() throws IOException {
indexInput.seek(markPointer);
counter = markCounter;
}
}

View File

@ -236,4 +236,30 @@ public class InputStreamIndexInputTests {
assertThat(is.actualSizeToRead(), equalTo(0l));
assertThat(is.read(read), equalTo(-1));
}
@Test
public void testMarkRest() throws Exception {
RAMDirectory dir = new RAMDirectory();
IndexOutput output = dir.createOutput("test");
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 1);
}
for (int i = 0; i < 3; i++) {
output.writeByte((byte) 2);
}
output.close();
IndexInput input = dir.openInput("test");
InputStreamIndexInput is = new InputStreamIndexInput(input, 4);
assertThat(is.markSupported(), equalTo(true));
assertThat(is.read(), equalTo(1));
assertThat(is.read(), equalTo(1));
is.mark(0);
assertThat(is.read(), equalTo(1));
assertThat(is.read(), equalTo(2));
is.reset();
assertThat(is.read(), equalTo(1));
assertThat(is.read(), equalTo(2));
}
}