diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f14e0d035e6..02c41e73ce8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -296,6 +296,9 @@ Release 2.5.0 - UNRELEASED HDFS-6224. Add a unit test to TestAuditLogger for file permissions passed to logAuditEvent. (Charles Lamb via wang) + HDFS-6194. Create new tests for ByteRangeInputStream. + (Akira Ajisaka via wheat9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java new file mode 100644 index 00000000000..11deab8f8de --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.web; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; + +import com.google.common.net.HttpHeaders; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +public class TestByteRangeInputStream { + private class ByteRangeInputStreamImpl extends ByteRangeInputStream { + public ByteRangeInputStreamImpl(URLOpener o, URLOpener r) + throws IOException { + super(o, r); + } + + @Override + protected URL getResolvedUrl(HttpURLConnection connection) + throws IOException { + return new URL("http://resolvedurl/"); + } + } + + private ByteRangeInputStream.URLOpener getMockURLOpener(URL url) + throws IOException { + ByteRangeInputStream.URLOpener opener = + mock(ByteRangeInputStream.URLOpener.class, CALLS_REAL_METHODS); + opener.setURL(url); + doReturn(getMockConnection("65535")) + .when(opener).connect(anyLong(), anyBoolean()); + return opener; + } + + private HttpURLConnection getMockConnection(String length) + throws IOException { + HttpURLConnection mockConnection = mock(HttpURLConnection.class); + doReturn(new ByteArrayInputStream("asdf".getBytes())) + .when(mockConnection).getInputStream(); + doReturn(length).when(mockConnection) + .getHeaderField(HttpHeaders.CONTENT_LENGTH); + return mockConnection; + } + + @Test + public void testByteRange() throws IOException { + ByteRangeInputStream.URLOpener oMock = getMockURLOpener( + new URL("http://test")); + ByteRangeInputStream.URLOpener rMock = getMockURLOpener(null); + ByteRangeInputStream bris = new ByteRangeInputStreamImpl(oMock, rMock); + + bris.seek(0); + + assertEquals("getPos wrong", 0, bris.getPos()); + + bris.read(); + + assertEquals("Initial call made incorrectly (offset check)", + 0, bris.startPos); + assertEquals("getPos should return 1 after reading one byte", 1, + bris.getPos()); + verify(oMock, times(1)).connect(0, false); + + bris.read(); + + assertEquals("getPos should return 2 after reading two bytes", 2, + bris.getPos()); + // No additional connections should have been made (no seek) + verify(oMock, times(1)).connect(0, false); + + rMock.setURL(new URL("http://resolvedurl/")); + + bris.seek(100); + bris.read(); + + assertEquals("Seek to 100 bytes made incorrectly (offset Check)", + 100, bris.startPos); + assertEquals("getPos should return 101 after reading one byte", 101, + bris.getPos()); + verify(rMock, times(1)).connect(100, true); + + bris.seek(101); + bris.read(); + + // Seek to 101 should not result in another request + verify(rMock, times(1)).connect(100, true); + verify(rMock, times(0)).connect(101, true); + + bris.seek(2500); + bris.read(); + + assertEquals("Seek to 2500 bytes made incorrectly (offset Check)", + 2500, bris.startPos); + + doReturn(getMockConnection(null)) + .when(rMock).connect(anyLong(), anyBoolean()); + bris.seek(500); + try { + bris.read(); + fail("Exception should be thrown when content-length is not given"); + } catch (IOException e) { + assertTrue("Incorrect response message: " + e.getMessage(), + e.getMessage().startsWith(HttpHeaders.CONTENT_LENGTH + + " is missing: ")); + } + bris.close(); + } + + @Test + public void testPropagatedClose() throws IOException { + ByteRangeInputStream bris = + mock(ByteRangeInputStream.class, CALLS_REAL_METHODS); + InputStream mockStream = mock(InputStream.class); + doReturn(mockStream).when(bris).openInputStream(); + Whitebox.setInternalState(bris, "status", + ByteRangeInputStream.StreamStatus.SEEK); + + int brisOpens = 0; + int brisCloses = 0; + int isCloses = 0; + + // first open, shouldn't close underlying stream + bris.getInputStream(); + verify(bris, times(++brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); + verify(mockStream, times(isCloses)).close(); + + // stream is open, shouldn't close underlying stream + bris.getInputStream(); + verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); + verify(mockStream, times(isCloses)).close(); + + // seek forces a reopen, should close underlying stream + bris.seek(1); + bris.getInputStream(); + verify(bris, times(++brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); + verify(mockStream, times(++isCloses)).close(); + + // verify that the underlying stream isn't closed after a seek + // ie. the state was correctly updated + bris.getInputStream(); + verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); + verify(mockStream, times(isCloses)).close(); + + // seeking to same location should be a no-op + bris.seek(1); + bris.getInputStream(); + verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); + verify(mockStream, times(isCloses)).close(); + + // close should of course close + bris.close(); + verify(bris, times(++brisCloses)).close(); + verify(mockStream, times(++isCloses)).close(); + + // it's already closed, underlying stream should not close + bris.close(); + verify(bris, times(++brisCloses)).close(); + verify(mockStream, times(isCloses)).close(); + + // it's closed, don't reopen it + boolean errored = false; + try { + bris.getInputStream(); + } catch (IOException e) { + errored = true; + assertEquals("Stream closed", e.getMessage()); + } finally { + assertTrue("Read a closed steam", errored); + } + verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisCloses)).close(); + + verify(mockStream, times(isCloses)).close(); + } +}