HADOOP-15870. S3AInputStream.remainingInFile should use nextReadPos.

Contributed by lqjacklee.

Change-Id: I32bb00a683102e7ff8ff8ce0b8d9c3195ca7381c
This commit is contained in:
lqjacklee 2019-10-10 21:58:42 +01:00 committed by Steve Loughran
parent 957253fea6
commit 7a4b3d42c4
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
4 changed files with 143 additions and 16 deletions

View File

@ -119,6 +119,59 @@ Return the data at the current position.
else else
result = -1 result = -1
### <a name="InputStream.available"></a> `InputStream.available()`
Returns the number of bytes "estimated" to be readable on a stream before `read()`
blocks on any IO (i.e. the thread is potentially suspended for some time).
That is: for all values `v` returned by `available()`, `read(buffer, 0, v)`
is should not block.
#### Postconditions
```python
if len(data) == 0:
result = 0
elif pos >= len(data):
result = 0
else:
d = "the amount of data known to be already buffered/cached locally"
result = min(1, d) # optional but recommended: see below.
```
As `0` is a number which is always meets this condition, it is nominally
possible for an implementation to simply return `0`. However, this is not
considered useful, and some applications/libraries expect a positive number.
#### The GZip problem.
[JDK-7036144](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144),
"GZIPInputStream readTrailer uses faulty available() test for end-of-stream"
discusses how the JDK's GZip code it uses `available()` to detect an EOF,
in a loop similar to the the following
```java
while(instream.available()) {
process(instream.read());
}
```
The correct loop would have been:
```java
int r;
while((r=instream.read()) >= 0) {
process(r);
}
```
If `available()` ever returns 0, then the gzip loop halts prematurely.
For this reason, implementations *should* return a value &gt;=1, even
if it breaks that requirement of `available()` returning the amount guaranteed
not to block on reads.
### <a name="InputStream.read.buffer[]"></a> `InputStream.read(buffer[], offset, length)` ### <a name="InputStream.read.buffer[]"></a> `InputStream.read(buffer[], offset, length)`

View File

@ -32,6 +32,7 @@ import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.Random; import java.util.Random;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
@ -99,14 +100,18 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
describe("seek and read a 0 byte file"); describe("seek and read a 0 byte file");
instream = getFileSystem().open(zeroByteFile); instream = getFileSystem().open(zeroByteFile);
assertEquals(0, instream.getPos()); assertEquals(0, instream.getPos());
assertAvailableIsZero(instream);
//expect initial read to fai; //expect initial read to fai;
int result = instream.read(); int result = instream.read();
assertMinusOne("initial byte read", result); assertMinusOne("initial byte read", result);
assertAvailableIsZero(instream);
byte[] buffer = new byte[1]; byte[] buffer = new byte[1];
//expect that seek to 0 works //expect that seek to 0 works
instream.seek(0); instream.seek(0);
assertAvailableIsZero(instream);
//reread, expect same exception //reread, expect same exception
result = instream.read(); result = instream.read();
assertAvailableIsZero(instream);
assertMinusOne("post-seek byte read", result); assertMinusOne("post-seek byte read", result);
result = instream.read(buffer, 0, 1); result = instream.read(buffer, 0, 1);
assertMinusOne("post-seek buffer read", result); assertMinusOne("post-seek buffer read", result);
@ -132,8 +137,8 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
@Test @Test
public void testSeekReadClosedFile() throws Throwable { public void testSeekReadClosedFile() throws Throwable {
instream = getFileSystem().open(smallSeekFile); instream = getFileSystem().open(smallSeekFile);
getLogger().debug( getLogger().debug("Stream is of type {}",
"Stream is of type " + instream.getClass().getCanonicalName()); instream.getClass().getCanonicalName());
instream.close(); instream.close();
try { try {
instream.seek(0); instream.seek(0);
@ -168,10 +173,26 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
try { try {
long offset = instream.getPos(); long offset = instream.getPos();
} catch (IOException e) { } catch (IOException e) {
// its valid to raise error here; but the test is applied to make // it is valid to raise error here; but the test is applied to make
// sure there's no other exception like an NPE. // sure there's no other exception like an NPE.
} }
// a closed stream should either fail or return 0 bytes.
try {
int a = instream.available();
LOG.info("available() returns a value on a closed file: {}", a);
assertAvailableIsZero(instream);
} catch (IOException | IllegalStateException expected) {
// expected
}
// a closed stream should either fail or return 0 bytes.
try {
int a = instream.available();
LOG.info("available() returns a value on a closed file: {}", a);
assertAvailableIsZero(instream);
} catch (IOException | IllegalStateException expected) {
// expected
}
//and close again //and close again
instream.close(); instream.close();
} }
@ -205,6 +226,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
//expect that seek to 0 works //expect that seek to 0 works
instream.seek(0); instream.seek(0);
int result = instream.read(); int result = instream.read();
assertAvailableIsPositive(instream);
assertEquals(0, result); assertEquals(0, result);
assertEquals(1, instream.read()); assertEquals(1, instream.read());
assertEquals(2, instream.getPos()); assertEquals(2, instream.getPos());
@ -226,13 +248,24 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
//go just before the end //go just before the end
instream.seek(TEST_FILE_LEN - 2); instream.seek(TEST_FILE_LEN - 2);
assertTrue("Premature EOF", instream.read() != -1); assertTrue("Premature EOF", instream.read() != -1);
assertAvailableIsPositive(instream);
assertTrue("Premature EOF", instream.read() != -1); assertTrue("Premature EOF", instream.read() != -1);
checkAvailabilityAtEOF();
assertMinusOne("read past end of file", instream.read()); assertMinusOne("read past end of file", instream.read());
} }
/**
* This can be overridden if a filesystem always returns 01
* @throws IOException
*/
protected void checkAvailabilityAtEOF() throws IOException {
assertAvailableIsZero(instream);
}
@Test @Test
public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable { public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable {
describe("do a seek past the EOF, then verify the stream recovers"); describe("do a seek past the EOF, " +
"then verify the stream recovers");
instream = getFileSystem().open(smallSeekFile); instream = getFileSystem().open(smallSeekFile);
//go just before the end. This may or may not fail; it may be delayed until the //go just before the end. This may or may not fail; it may be delayed until the
//read //read
@ -261,6 +294,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
//now go back and try to read from a valid point in the file //now go back and try to read from a valid point in the file
instream.seek(1); instream.seek(1);
assertTrue("Premature EOF", instream.read() != -1); assertTrue("Premature EOF", instream.read() != -1);
assertAvailableIsPositive(instream);
} }
/** /**
@ -278,6 +312,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
//expect that seek to 0 works //expect that seek to 0 works
instream.seek(0); instream.seek(0);
int result = instream.read(); int result = instream.read();
assertAvailableIsPositive(instream);
assertEquals(0, result); assertEquals(0, result);
assertEquals(1, instream.read()); assertEquals(1, instream.read());
assertEquals(2, instream.read()); assertEquals(2, instream.read());
@ -296,6 +331,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
instream.seek(0); instream.seek(0);
assertEquals(0, instream.getPos()); assertEquals(0, instream.getPos());
instream.read(); instream.read();
assertAvailableIsPositive(instream);
assertEquals(1, instream.getPos()); assertEquals(1, instream.getPos());
byte[] buf = new byte[80 * 1024]; byte[] buf = new byte[80 * 1024];
instream.readFully(1, buf, 0, buf.length); instream.readFully(1, buf, 0, buf.length);
@ -314,7 +350,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
instream.seek(39999); instream.seek(39999);
assertTrue(-1 != instream.read()); assertTrue(-1 != instream.read());
assertEquals(40000, instream.getPos()); assertEquals(40000, instream.getPos());
assertAvailableIsPositive(instream);
int v = 256; int v = 256;
byte[] readBuffer = new byte[v]; byte[] readBuffer = new byte[v];
assertEquals(v, instream.read(128, readBuffer, 0, v)); assertEquals(v, instream.read(128, readBuffer, 0, v));
@ -322,6 +358,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
assertEquals(40000, instream.getPos()); assertEquals(40000, instream.getPos());
//content is the same too //content is the same too
assertEquals("@40000", block[40000], (byte) instream.read()); assertEquals("@40000", block[40000], (byte) instream.read());
assertAvailableIsPositive(instream);
//now verify the picked up data //now verify the picked up data
for (int i = 0; i < 256; i++) { for (int i = 0; i < 256; i++) {
assertEquals("@" + i, block[i + 128], readBuffer[i]); assertEquals("@" + i, block[i + 128], readBuffer[i]);
@ -376,6 +413,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
assertEquals(0, instream.getPos()); assertEquals(0, instream.getPos());
byte[] buffer = new byte[1]; byte[] buffer = new byte[1];
instream.readFully(0, buffer, 0, 0); instream.readFully(0, buffer, 0, 0);
assertAvailableIsZero(instream);
assertEquals(0, instream.getPos()); assertEquals(0, instream.getPos());
// seek to 0 read 0 bytes from it // seek to 0 read 0 bytes from it
instream.seek(0); instream.seek(0);
@ -551,7 +589,9 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
fail("Expected an exception, got " + r); fail("Expected an exception, got " + r);
} catch (EOFException e) { } catch (EOFException e) {
handleExpectedException(e); handleExpectedException(e);
} catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) { } catch (IOException
| IllegalArgumentException
| IndexOutOfBoundsException e) {
handleRelaxedException("read() with a negative position ", handleRelaxedException("read() with a negative position ",
"EOFException", "EOFException",
e); e);
@ -587,6 +627,29 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
instream = getFileSystem().open(smallSeekFile); instream = getFileSystem().open(smallSeekFile);
instream.seek(TEST_FILE_LEN -1); instream.seek(TEST_FILE_LEN -1);
assertTrue("read at last byte", instream.read() > 0); assertTrue("read at last byte", instream.read() > 0);
assertAvailableIsZero(instream);
assertEquals("read just past EOF", -1, instream.read()); assertEquals("read just past EOF", -1, instream.read());
} }
/**
* Assert that the number of bytes available is zero.
* @param in input stream
*/
protected static void assertAvailableIsZero(FSDataInputStream in)
throws IOException {
assertEquals("stream.available() should be zero",
0, in.available());
}
/**
* Assert that the number of bytes available is greater than zero.
* @param in input stream
*/
protected static void assertAvailableIsPositive(FSDataInputStream in)
throws IOException {
int available = in.available();
assertTrue("stream.available() should be positive but is "
+ available,
available > 0);
}
} }

View File

@ -218,7 +218,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
} }
@Override @Override
public synchronized long getPos() throws IOException { public synchronized long getPos() {
return (nextReadPos < 0) ? 0 : nextReadPos; return (nextReadPos < 0) ? 0 : nextReadPos;
} }
@ -620,15 +620,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
return isObjectStreamOpen(); return isObjectStreamOpen();
} }
/**
* Return the number of bytes available.
* If the inner stream is closed, the value is 1 for consistency
* with S3ObjectStream -and so address the GZip bug
* http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144 .
* If the stream is open, then it is the amount returned by the
* wrapped stream.
* @return a value greater than or equal to zero.
* @throws IOException IO failure.
*/
@Override @Override
public synchronized int available() throws IOException { public synchronized int available() throws IOException {
checkNotClosed(); checkNotClosed();
if (contentLength == 0 || (nextReadPos >= contentLength)) {
long remaining = remainingInFile(); return 0;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
} }
return (int)remaining;
return wrappedStream == null
? 1
: wrappedStream.available();
} }
/** /**
@ -637,8 +648,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public synchronized long remainingInFile() { public synchronized long remainingInFile() throws IOException {
return this.contentLength - this.pos; return contentLength - getPos();
} }
/** /**
@ -649,7 +660,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public synchronized long remainingInCurrentRequest() { public synchronized long remainingInCurrentRequest() {
return this.contentRangeFinish - this.pos; return contentRangeFinish - getPos();
} }
@InterfaceAudience.Private @InterfaceAudience.Private

View File

@ -80,7 +80,7 @@ public class ITestS3AContractSeek extends AbstractContractSeekTest {
* which S3A Supports. * which S3A Supports.
* @return a list of seek policies to test. * @return a list of seek policies to test.
*/ */
@Parameterized.Parameters @Parameterized.Parameters(name = "{0}-{1}")
public static Collection<Object[]> params() { public static Collection<Object[]> params() {
return Arrays.asList(new Object[][]{ return Arrays.asList(new Object[][]{
{INPUT_FADV_SEQUENTIAL, Default_JSSE}, {INPUT_FADV_SEQUENTIAL, Default_JSSE},