From 53c2288dc649c0b9d021c81998a33e0bd687d896 Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Wed, 24 Sep 2014 12:05:19 -0700 Subject: [PATCH] HDFS-7049. TestByteRangeInputStream.testPropagatedClose fails and throw NPE on branch-2. Contributed by Juan Yu. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/web/TestByteRangeInputStream.java | 241 ++++++++---------- 2 files changed, 104 insertions(+), 140 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index ddc143ac5b4..6a0911550bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -456,6 +456,9 @@ Release 2.6.0 - UNRELEASED HDFS-7132. hdfs namenode -metadataVersion command does not honor configured name dirs. (Charles Lamb via wang) + HDFS-7049. TestByteRangeInputStream.testPropagatedClose fails and throw + NPE on branch-2. (Juan Yu via wheat9) + BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS HDFS-6387. HDFS CLI admin tool for creating & deleting an diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java index 9e93166c398..11deab8f8de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java @@ -18,12 +18,13 @@ package org.apache.hadoop.hdfs.web; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -33,217 +34,177 @@ import java.io.InputStream; import java.net.HttpURLConnection; import java.net.URL; -import org.apache.hadoop.hdfs.server.namenode.StreamFile; -import org.apache.hadoop.hdfs.web.HftpFileSystem; +import com.google.common.net.HttpHeaders; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; public class TestByteRangeInputStream { -public static class MockHttpURLConnection extends HttpURLConnection { - public MockHttpURLConnection(URL u) { - super(u); - } - - @Override - public boolean usingProxy(){ - return false; - } - - @Override - public void disconnect() { - } - - @Override - public void connect() { - } - - @Override - public InputStream getInputStream() throws IOException { - return new ByteArrayInputStream("asdf".getBytes()); - } - - @Override - public URL getURL() { - URL u = null; - try { - u = new URL("http://resolvedurl/"); - } catch (Exception e) { - System.out.println(e.getMessage()); + private class ByteRangeInputStreamImpl extends ByteRangeInputStream { + public ByteRangeInputStreamImpl(URLOpener o, URLOpener r) + throws IOException { + super(o, r); } - return u; - } - @Override - public int getResponseCode() { - if (responseCode != -1) { - return responseCode; - } else { - if (getRequestProperty("Range") == null) { - return 200; - } else { - return 206; - } + @Override + protected URL getResolvedUrl(HttpURLConnection connection) + throws IOException { + return new URL("http://resolvedurl/"); } } - public void setResponseCode(int resCode) { - responseCode = resCode; + private ByteRangeInputStream.URLOpener getMockURLOpener(URL url) + throws IOException { + ByteRangeInputStream.URLOpener opener = + mock(ByteRangeInputStream.URLOpener.class, CALLS_REAL_METHODS); + opener.setURL(url); + doReturn(getMockConnection("65535")) + .when(opener).connect(anyLong(), anyBoolean()); + return opener; } - @Override - public String getHeaderField(String field) { - return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null; + private HttpURLConnection getMockConnection(String length) + throws IOException { + HttpURLConnection mockConnection = mock(HttpURLConnection.class); + doReturn(new ByteArrayInputStream("asdf".getBytes())) + .when(mockConnection).getInputStream(); + doReturn(length).when(mockConnection) + .getHeaderField(HttpHeaders.CONTENT_LENGTH); + return mockConnection; } -} @Test public void testByteRange() throws IOException { - URLConnectionFactory factory = mock(URLConnectionFactory.class); - HftpFileSystem.RangeHeaderUrlOpener ospy = spy( - new HftpFileSystem.RangeHeaderUrlOpener(factory, new URL("http://test/"))); - doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy) - .openConnection(); - HftpFileSystem.RangeHeaderUrlOpener rspy = spy( - new HftpFileSystem.RangeHeaderUrlOpener(factory, null)); - doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy) - .openConnection(); - ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy); + ByteRangeInputStream.URLOpener oMock = getMockURLOpener( + new URL("http://test")); + ByteRangeInputStream.URLOpener rMock = getMockURLOpener(null); + ByteRangeInputStream bris = new ByteRangeInputStreamImpl(oMock, rMock); - assertEquals("getPos wrong", 0, is.getPos()); + bris.seek(0); - is.read(); + assertEquals("getPos wrong", 0, bris.getPos()); - assertNull("Initial call made incorrectly (Range Check)", ospy - .openConnection().getRequestProperty("Range")); + bris.read(); - assertEquals("getPos should be 1 after reading one byte", 1, is.getPos()); + assertEquals("Initial call made incorrectly (offset check)", + 0, bris.startPos); + assertEquals("getPos should return 1 after reading one byte", 1, + bris.getPos()); + verify(oMock, times(1)).connect(0, false); - is.read(); - - assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos()); + bris.read(); + assertEquals("getPos should return 2 after reading two bytes", 2, + bris.getPos()); // No additional connections should have been made (no seek) + verify(oMock, times(1)).connect(0, false); - rspy.setURL(new URL("http://resolvedurl/")); + rMock.setURL(new URL("http://resolvedurl/")); - is.seek(100); - is.read(); + bris.seek(100); + bris.read(); - assertEquals("Seek to 100 bytes made incorrectly (Range Check)", - "bytes=100-", rspy.openConnection().getRequestProperty("Range")); + assertEquals("Seek to 100 bytes made incorrectly (offset Check)", + 100, bris.startPos); + assertEquals("getPos should return 101 after reading one byte", 101, + bris.getPos()); + verify(rMock, times(1)).connect(100, true); - assertEquals("getPos should be 101 after reading one byte", 101, - is.getPos()); + bris.seek(101); + bris.read(); - verify(rspy, times(2)).openConnection(); + // Seek to 101 should not result in another request + verify(rMock, times(1)).connect(100, true); + verify(rMock, times(0)).connect(101, true); - is.seek(101); - is.read(); + bris.seek(2500); + bris.read(); - verify(rspy, times(2)).openConnection(); - - // Seek to 101 should not result in another request" - - is.seek(2500); - is.read(); - - assertEquals("Seek to 2500 bytes made incorrectly (Range Check)", - "bytes=2500-", rspy.openConnection().getRequestProperty("Range")); - - ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200); - is.seek(500); + assertEquals("Seek to 2500 bytes made incorrectly (offset Check)", + 2500, bris.startPos); + doReturn(getMockConnection(null)) + .when(rMock).connect(anyLong(), anyBoolean()); + bris.seek(500); try { - is.read(); - fail("Exception should be thrown when 200 response is given " - + "but 206 is expected"); + bris.read(); + fail("Exception should be thrown when content-length is not given"); } catch (IOException e) { - assertEquals("Should fail because incorrect response code was sent", - "HTTP_PARTIAL expected, received 200", e.getMessage()); + assertTrue("Incorrect response message: " + e.getMessage(), + e.getMessage().startsWith(HttpHeaders.CONTENT_LENGTH + + " is missing: ")); } - - ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206); - is.seek(0); - - try { - is.read(); - fail("Exception should be thrown when 206 response is given " - + "but 200 is expected"); - } catch (IOException e) { - assertEquals("Should fail because incorrect response code was sent", - "HTTP_OK expected, received 206", e.getMessage()); - } - is.close(); + bris.close(); } @Test public void testPropagatedClose() throws IOException { - URLConnectionFactory factory = mock(URLConnectionFactory.class); - - ByteRangeInputStream brs = spy(new HftpFileSystem.RangeHeaderInputStream( - factory, new URL("http://test/"))); - + ByteRangeInputStream bris = + mock(ByteRangeInputStream.class, CALLS_REAL_METHODS); InputStream mockStream = mock(InputStream.class); - doReturn(mockStream).when(brs).openInputStream(); + doReturn(mockStream).when(bris).openInputStream(); + Whitebox.setInternalState(bris, "status", + ByteRangeInputStream.StreamStatus.SEEK); int brisOpens = 0; int brisCloses = 0; int isCloses = 0; // first open, shouldn't close underlying stream - brs.getInputStream(); - verify(brs, times(++brisOpens)).openInputStream(); - verify(brs, times(brisCloses)).close(); + bris.getInputStream(); + verify(bris, times(++brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // stream is open, shouldn't close underlying stream - brs.getInputStream(); - verify(brs, times(brisOpens)).openInputStream(); - verify(brs, times(brisCloses)).close(); + bris.getInputStream(); + verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // seek forces a reopen, should close underlying stream - brs.seek(1); - brs.getInputStream(); - verify(brs, times(++brisOpens)).openInputStream(); - verify(brs, times(brisCloses)).close(); + bris.seek(1); + bris.getInputStream(); + verify(bris, times(++brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); verify(mockStream, times(++isCloses)).close(); // verify that the underlying stream isn't closed after a seek // ie. the state was correctly updated - brs.getInputStream(); - verify(brs, times(brisOpens)).openInputStream(); - verify(brs, times(brisCloses)).close(); + bris.getInputStream(); + verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // seeking to same location should be a no-op - brs.seek(1); - brs.getInputStream(); - verify(brs, times(brisOpens)).openInputStream(); - verify(brs, times(brisCloses)).close(); + bris.seek(1); + bris.getInputStream(); + verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // close should of course close - brs.close(); - verify(brs, times(++brisCloses)).close(); + bris.close(); + verify(bris, times(++brisCloses)).close(); verify(mockStream, times(++isCloses)).close(); // it's already closed, underlying stream should not close - brs.close(); - verify(brs, times(++brisCloses)).close(); + bris.close(); + verify(bris, times(++brisCloses)).close(); verify(mockStream, times(isCloses)).close(); // it's closed, don't reopen it boolean errored = false; try { - brs.getInputStream(); + bris.getInputStream(); } catch (IOException e) { errored = true; assertEquals("Stream closed", e.getMessage()); } finally { assertTrue("Read a closed steam", errored); } - verify(brs, times(brisOpens)).openInputStream(); - verify(brs, times(brisCloses)).close(); + verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); + verify(mockStream, times(isCloses)).close(); } }