From 772fe0f478351a7e02b25edfe8737f637bae2183 Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Thu, 3 Sep 2015 19:40:34 +0900 Subject: [PATCH] HDFS-8885. ByteRangeInputStream used in webhdfs does not override available(). Contributed by Shradha Revankar. (cherry picked from commit c92e31bd659e95c8baa0f3b2bf0cd7f6f72278e6) --- .../hadoop/hdfs/web/ByteRangeInputStream.java | 11 +++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/web/TestByteRangeInputStream.java | 79 +++++++++++++++++++ 3 files changed, 93 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java index 8e21b77f132..e7f704387ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java @@ -300,4 +300,15 @@ public void close() throws IOException { } status = StreamStatus.CLOSED; } + + @Override + public synchronized int available() throws IOException{ + getInputStream(); + if(fileLength != null){ + long remaining = fileLength - currentPos; + return remaining <= Integer.MAX_VALUE ? (int) remaining : Integer.MAX_VALUE; + }else { + return Integer.MAX_VALUE; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9553ec36d79..dbe0fa0eeed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -940,6 +940,9 @@ Release 2.8.0 - UNRELEASED HDFS-9003. ForkJoin thread pool leaks. (Kihwal Lee via jing9) + HDFS-8885. ByteRangeInputStream used in webhdfs does not override + available(). (Shradha Revankar via aajisaka) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java index 40f2b9c15b4..7f1f00f13cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java @@ -210,4 +210,83 @@ public void testPropagatedClose() throws IOException { verify(mockStream.in, times(isCloses)).close(); } + + + @Test + public void testAvailable() throws IOException { + ByteRangeInputStream bris = + mock(ByteRangeInputStream.class, CALLS_REAL_METHODS); + InputStreamAndFileLength mockStream = new InputStreamAndFileLength(65535L, + mock(InputStream.class)); + doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong()); + Whitebox.setInternalState(bris, "status", + ByteRangeInputStream.StreamStatus.SEEK); + + + assertEquals("Before read or seek, available should be same as filelength", + 65535, bris.available()); + verify(bris, times(1)).openInputStream(Mockito.anyLong()); + + bris.seek(10); + assertEquals("Seek 10 bytes, available should return filelength - 10" + , 65525, + bris.available()); + + //no more bytes available + bris.seek(65535); + assertEquals("Seek till end of file, available should return 0 bytes", 0, + bris.available()); + + //test reads, seek back to 0 and start reading + bris.seek(0); + bris.read(); + assertEquals("Read 1 byte, available must return filelength - 1", + 65534, bris.available()); + + bris.read(); + assertEquals("Read another 1 byte, available must return filelength - 2", + 65533, bris.available()); + + //seek and read + bris.seek(100); + bris.read(); + assertEquals("Seek to offset 100 and read 1 byte, available should return filelength - 101", + 65434, bris.available()); + bris.close(); + } + + @Test + public void testAvailableLengthNotKnown() throws IOException { + ByteRangeInputStream bris = + mock(ByteRangeInputStream.class, CALLS_REAL_METHODS); + //Length is null for chunked transfer-encoding + InputStreamAndFileLength mockStream = new InputStreamAndFileLength(null, + mock(InputStream.class)); + doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong()); + Whitebox.setInternalState(bris, "status", + ByteRangeInputStream.StreamStatus.SEEK); + + assertEquals(Integer.MAX_VALUE, bris.available()); + } + + @Test + public void testAvailableStreamClosed() throws IOException { + ByteRangeInputStream bris = + mock(ByteRangeInputStream.class, CALLS_REAL_METHODS); + InputStreamAndFileLength mockStream = new InputStreamAndFileLength(null, + mock(InputStream.class)); + doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong()); + Whitebox.setInternalState(bris, "status", + ByteRangeInputStream.StreamStatus.SEEK); + + bris.close(); + try{ + bris.available(); + fail("Exception should be thrown when stream is closed"); + }catch(IOException e){ + assertTrue("Exception when stream is closed", + e.getMessage().equals("Stream closed")); + } + } + }