HDFS-8885. ByteRangeInputStream used in webhdfs does not override available(). Contributed by Shradha Revankar.

(cherry picked from commit c92e31bd65)
This commit is contained in:
Akira Ajisaka 2015-09-03 19:40:34 +09:00
parent cd82fa2f83
commit 772fe0f478
3 changed files with 93 additions and 0 deletions

View File

@ -300,4 +300,15 @@ public void close() throws IOException {
}
status = StreamStatus.CLOSED;
}
@Override
public synchronized int available() throws IOException{
getInputStream();
if(fileLength != null){
long remaining = fileLength - currentPos;
return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE;
}else {
return Integer.MAX_VALUE;
}
}
}

View File

@ -940,6 +940,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9003. ForkJoin thread pool leaks. (Kihwal Lee via jing9)
HDFS-8885. ByteRangeInputStream used in webhdfs does not override
available(). (Shradha Revankar via aajisaka)
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -210,4 +210,83 @@ public void testPropagatedClose() throws IOException {
verify(mockStream.in, times(isCloses)).close();
}
@Test
public void testAvailable() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
InputStreamAndFileLength mockStream = new InputStreamAndFileLength(65535L,
mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
assertEquals("Before read or seek, available should be same as filelength",
65535, bris.available());
verify(bris, times(1)).openInputStream(Mockito.anyLong());
bris.seek(10);
assertEquals("Seek 10 bytes, available should return filelength - 10"
, 65525,
bris.available());
//no more bytes available
bris.seek(65535);
assertEquals("Seek till end of file, available should return 0 bytes", 0,
bris.available());
//test reads, seek back to 0 and start reading
bris.seek(0);
bris.read();
assertEquals("Read 1 byte, available must return filelength - 1",
65534, bris.available());
bris.read();
assertEquals("Read another 1 byte, available must return filelength - 2",
65533, bris.available());
//seek and read
bris.seek(100);
bris.read();
assertEquals("Seek to offset 100 and read 1 byte, available should return filelength - 101",
65434, bris.available());
bris.close();
}
@Test
public void testAvailableLengthNotKnown() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
//Length is null for chunked transfer-encoding
InputStreamAndFileLength mockStream = new InputStreamAndFileLength(null,
mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
assertEquals(Integer.MAX_VALUE, bris.available());
}
@Test
public void testAvailableStreamClosed() throws IOException {
ByteRangeInputStream bris =
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
InputStreamAndFileLength mockStream = new InputStreamAndFileLength(null,
mock(InputStream.class));
doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong());
Whitebox.setInternalState(bris, "status",
ByteRangeInputStream.StreamStatus.SEEK);
bris.close();
try{
bris.available();
fail("Exception should be thrown when stream is closed");
}catch(IOException e){
assertTrue("Exception when stream is closed",
e.getMessage().equals("Stream closed"));
}
}
}