HDFS-7049. TestByteRangeInputStream.testPropagatedClose fails and throw NPE on branch-2. Contributed by Juan Yu.
This commit is contained in:
parent
5d251d99d6
commit
53c2288dc6
|
@ -456,6 +456,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
HDFS-7132. hdfs namenode -metadataVersion command does not honor
|
HDFS-7132. hdfs namenode -metadataVersion command does not honor
|
||||||
configured name dirs. (Charles Lamb via wang)
|
configured name dirs. (Charles Lamb via wang)
|
||||||
|
|
||||||
|
HDFS-7049. TestByteRangeInputStream.testPropagatedClose fails and throw
|
||||||
|
NPE on branch-2. (Juan Yu via wheat9)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
HDFS-6387. HDFS CLI admin tool for creating & deleting an
|
||||||
|
|
|
@ -18,12 +18,13 @@
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Mockito.CALLS_REAL_METHODS;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
@ -33,217 +34,177 @@ import java.io.InputStream;
|
||||||
import java.net.HttpURLConnection;
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
|
import com.google.common.net.HttpHeaders;
|
||||||
import org.apache.hadoop.hdfs.web.HftpFileSystem;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
public class TestByteRangeInputStream {
|
public class TestByteRangeInputStream {
|
||||||
public static class MockHttpURLConnection extends HttpURLConnection {
|
private class ByteRangeInputStreamImpl extends ByteRangeInputStream {
|
||||||
public MockHttpURLConnection(URL u) {
|
public ByteRangeInputStreamImpl(URLOpener o, URLOpener r)
|
||||||
super(u);
|
throws IOException {
|
||||||
}
|
super(o, r);
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean usingProxy(){
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void disconnect() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connect() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public InputStream getInputStream() throws IOException {
|
|
||||||
return new ByteArrayInputStream("asdf".getBytes());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public URL getURL() {
|
|
||||||
URL u = null;
|
|
||||||
try {
|
|
||||||
u = new URL("http://resolvedurl/");
|
|
||||||
} catch (Exception e) {
|
|
||||||
System.out.println(e.getMessage());
|
|
||||||
}
|
}
|
||||||
return u;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getResponseCode() {
|
protected URL getResolvedUrl(HttpURLConnection connection)
|
||||||
if (responseCode != -1) {
|
throws IOException {
|
||||||
return responseCode;
|
return new URL("http://resolvedurl/");
|
||||||
} else {
|
|
||||||
if (getRequestProperty("Range") == null) {
|
|
||||||
return 200;
|
|
||||||
} else {
|
|
||||||
return 206;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setResponseCode(int resCode) {
|
private ByteRangeInputStream.URLOpener getMockURLOpener(URL url)
|
||||||
responseCode = resCode;
|
throws IOException {
|
||||||
|
ByteRangeInputStream.URLOpener opener =
|
||||||
|
mock(ByteRangeInputStream.URLOpener.class, CALLS_REAL_METHODS);
|
||||||
|
opener.setURL(url);
|
||||||
|
doReturn(getMockConnection("65535"))
|
||||||
|
.when(opener).connect(anyLong(), anyBoolean());
|
||||||
|
return opener;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
private HttpURLConnection getMockConnection(String length)
|
||||||
public String getHeaderField(String field) {
|
throws IOException {
|
||||||
return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
|
HttpURLConnection mockConnection = mock(HttpURLConnection.class);
|
||||||
|
doReturn(new ByteArrayInputStream("asdf".getBytes()))
|
||||||
|
.when(mockConnection).getInputStream();
|
||||||
|
doReturn(length).when(mockConnection)
|
||||||
|
.getHeaderField(HttpHeaders.CONTENT_LENGTH);
|
||||||
|
return mockConnection;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testByteRange() throws IOException {
|
public void testByteRange() throws IOException {
|
||||||
URLConnectionFactory factory = mock(URLConnectionFactory.class);
|
ByteRangeInputStream.URLOpener oMock = getMockURLOpener(
|
||||||
HftpFileSystem.RangeHeaderUrlOpener ospy = spy(
|
new URL("http://test"));
|
||||||
new HftpFileSystem.RangeHeaderUrlOpener(factory, new URL("http://test/")));
|
ByteRangeInputStream.URLOpener rMock = getMockURLOpener(null);
|
||||||
doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
|
ByteRangeInputStream bris = new ByteRangeInputStreamImpl(oMock, rMock);
|
||||||
.openConnection();
|
|
||||||
HftpFileSystem.RangeHeaderUrlOpener rspy = spy(
|
|
||||||
new HftpFileSystem.RangeHeaderUrlOpener(factory, null));
|
|
||||||
doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
|
|
||||||
.openConnection();
|
|
||||||
ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy);
|
|
||||||
|
|
||||||
assertEquals("getPos wrong", 0, is.getPos());
|
bris.seek(0);
|
||||||
|
|
||||||
is.read();
|
assertEquals("getPos wrong", 0, bris.getPos());
|
||||||
|
|
||||||
assertNull("Initial call made incorrectly (Range Check)", ospy
|
bris.read();
|
||||||
.openConnection().getRequestProperty("Range"));
|
|
||||||
|
|
||||||
assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
|
assertEquals("Initial call made incorrectly (offset check)",
|
||||||
|
0, bris.startPos);
|
||||||
|
assertEquals("getPos should return 1 after reading one byte", 1,
|
||||||
|
bris.getPos());
|
||||||
|
verify(oMock, times(1)).connect(0, false);
|
||||||
|
|
||||||
is.read();
|
bris.read();
|
||||||
|
|
||||||
assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
|
|
||||||
|
|
||||||
|
assertEquals("getPos should return 2 after reading two bytes", 2,
|
||||||
|
bris.getPos());
|
||||||
// No additional connections should have been made (no seek)
|
// No additional connections should have been made (no seek)
|
||||||
|
verify(oMock, times(1)).connect(0, false);
|
||||||
|
|
||||||
rspy.setURL(new URL("http://resolvedurl/"));
|
rMock.setURL(new URL("http://resolvedurl/"));
|
||||||
|
|
||||||
is.seek(100);
|
bris.seek(100);
|
||||||
is.read();
|
bris.read();
|
||||||
|
|
||||||
assertEquals("Seek to 100 bytes made incorrectly (Range Check)",
|
assertEquals("Seek to 100 bytes made incorrectly (offset Check)",
|
||||||
"bytes=100-", rspy.openConnection().getRequestProperty("Range"));
|
100, bris.startPos);
|
||||||
|
assertEquals("getPos should return 101 after reading one byte", 101,
|
||||||
|
bris.getPos());
|
||||||
|
verify(rMock, times(1)).connect(100, true);
|
||||||
|
|
||||||
assertEquals("getPos should be 101 after reading one byte", 101,
|
bris.seek(101);
|
||||||
is.getPos());
|
bris.read();
|
||||||
|
|
||||||
verify(rspy, times(2)).openConnection();
|
// Seek to 101 should not result in another request
|
||||||
|
verify(rMock, times(1)).connect(100, true);
|
||||||
|
verify(rMock, times(0)).connect(101, true);
|
||||||
|
|
||||||
is.seek(101);
|
bris.seek(2500);
|
||||||
is.read();
|
bris.read();
|
||||||
|
|
||||||
verify(rspy, times(2)).openConnection();
|
assertEquals("Seek to 2500 bytes made incorrectly (offset Check)",
|
||||||
|
2500, bris.startPos);
|
||||||
// Seek to 101 should not result in another request"
|
|
||||||
|
|
||||||
is.seek(2500);
|
|
||||||
is.read();
|
|
||||||
|
|
||||||
assertEquals("Seek to 2500 bytes made incorrectly (Range Check)",
|
|
||||||
"bytes=2500-", rspy.openConnection().getRequestProperty("Range"));
|
|
||||||
|
|
||||||
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
|
|
||||||
is.seek(500);
|
|
||||||
|
|
||||||
|
doReturn(getMockConnection(null))
|
||||||
|
.when(rMock).connect(anyLong(), anyBoolean());
|
||||||
|
bris.seek(500);
|
||||||
try {
|
try {
|
||||||
is.read();
|
bris.read();
|
||||||
fail("Exception should be thrown when 200 response is given "
|
fail("Exception should be thrown when content-length is not given");
|
||||||
+ "but 206 is expected");
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
assertEquals("Should fail because incorrect response code was sent",
|
assertTrue("Incorrect response message: " + e.getMessage(),
|
||||||
"HTTP_PARTIAL expected, received 200", e.getMessage());
|
e.getMessage().startsWith(HttpHeaders.CONTENT_LENGTH +
|
||||||
|
" is missing: "));
|
||||||
}
|
}
|
||||||
|
bris.close();
|
||||||
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
|
|
||||||
is.seek(0);
|
|
||||||
|
|
||||||
try {
|
|
||||||
is.read();
|
|
||||||
fail("Exception should be thrown when 206 response is given "
|
|
||||||
+ "but 200 is expected");
|
|
||||||
} catch (IOException e) {
|
|
||||||
assertEquals("Should fail because incorrect response code was sent",
|
|
||||||
"HTTP_OK expected, received 206", e.getMessage());
|
|
||||||
}
|
|
||||||
is.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPropagatedClose() throws IOException {
|
public void testPropagatedClose() throws IOException {
|
||||||
URLConnectionFactory factory = mock(URLConnectionFactory.class);
|
ByteRangeInputStream bris =
|
||||||
|
mock(ByteRangeInputStream.class, CALLS_REAL_METHODS);
|
||||||
ByteRangeInputStream brs = spy(new HftpFileSystem.RangeHeaderInputStream(
|
|
||||||
factory, new URL("http://test/")));
|
|
||||||
|
|
||||||
InputStream mockStream = mock(InputStream.class);
|
InputStream mockStream = mock(InputStream.class);
|
||||||
doReturn(mockStream).when(brs).openInputStream();
|
doReturn(mockStream).when(bris).openInputStream();
|
||||||
|
Whitebox.setInternalState(bris, "status",
|
||||||
|
ByteRangeInputStream.StreamStatus.SEEK);
|
||||||
|
|
||||||
int brisOpens = 0;
|
int brisOpens = 0;
|
||||||
int brisCloses = 0;
|
int brisCloses = 0;
|
||||||
int isCloses = 0;
|
int isCloses = 0;
|
||||||
|
|
||||||
// first open, shouldn't close underlying stream
|
// first open, shouldn't close underlying stream
|
||||||
brs.getInputStream();
|
bris.getInputStream();
|
||||||
verify(brs, times(++brisOpens)).openInputStream();
|
verify(bris, times(++brisOpens)).openInputStream();
|
||||||
verify(brs, times(brisCloses)).close();
|
verify(bris, times(brisCloses)).close();
|
||||||
verify(mockStream, times(isCloses)).close();
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
// stream is open, shouldn't close underlying stream
|
// stream is open, shouldn't close underlying stream
|
||||||
brs.getInputStream();
|
bris.getInputStream();
|
||||||
verify(brs, times(brisOpens)).openInputStream();
|
verify(bris, times(brisOpens)).openInputStream();
|
||||||
verify(brs, times(brisCloses)).close();
|
verify(bris, times(brisCloses)).close();
|
||||||
verify(mockStream, times(isCloses)).close();
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
// seek forces a reopen, should close underlying stream
|
// seek forces a reopen, should close underlying stream
|
||||||
brs.seek(1);
|
bris.seek(1);
|
||||||
brs.getInputStream();
|
bris.getInputStream();
|
||||||
verify(brs, times(++brisOpens)).openInputStream();
|
verify(bris, times(++brisOpens)).openInputStream();
|
||||||
verify(brs, times(brisCloses)).close();
|
verify(bris, times(brisCloses)).close();
|
||||||
verify(mockStream, times(++isCloses)).close();
|
verify(mockStream, times(++isCloses)).close();
|
||||||
|
|
||||||
// verify that the underlying stream isn't closed after a seek
|
// verify that the underlying stream isn't closed after a seek
|
||||||
// ie. the state was correctly updated
|
// ie. the state was correctly updated
|
||||||
brs.getInputStream();
|
bris.getInputStream();
|
||||||
verify(brs, times(brisOpens)).openInputStream();
|
verify(bris, times(brisOpens)).openInputStream();
|
||||||
verify(brs, times(brisCloses)).close();
|
verify(bris, times(brisCloses)).close();
|
||||||
verify(mockStream, times(isCloses)).close();
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
// seeking to same location should be a no-op
|
// seeking to same location should be a no-op
|
||||||
brs.seek(1);
|
bris.seek(1);
|
||||||
brs.getInputStream();
|
bris.getInputStream();
|
||||||
verify(brs, times(brisOpens)).openInputStream();
|
verify(bris, times(brisOpens)).openInputStream();
|
||||||
verify(brs, times(brisCloses)).close();
|
verify(bris, times(brisCloses)).close();
|
||||||
verify(mockStream, times(isCloses)).close();
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
// close should of course close
|
// close should of course close
|
||||||
brs.close();
|
bris.close();
|
||||||
verify(brs, times(++brisCloses)).close();
|
verify(bris, times(++brisCloses)).close();
|
||||||
verify(mockStream, times(++isCloses)).close();
|
verify(mockStream, times(++isCloses)).close();
|
||||||
|
|
||||||
// it's already closed, underlying stream should not close
|
// it's already closed, underlying stream should not close
|
||||||
brs.close();
|
bris.close();
|
||||||
verify(brs, times(++brisCloses)).close();
|
verify(bris, times(++brisCloses)).close();
|
||||||
verify(mockStream, times(isCloses)).close();
|
verify(mockStream, times(isCloses)).close();
|
||||||
|
|
||||||
// it's closed, don't reopen it
|
// it's closed, don't reopen it
|
||||||
boolean errored = false;
|
boolean errored = false;
|
||||||
try {
|
try {
|
||||||
brs.getInputStream();
|
bris.getInputStream();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
errored = true;
|
errored = true;
|
||||||
assertEquals("Stream closed", e.getMessage());
|
assertEquals("Stream closed", e.getMessage());
|
||||||
} finally {
|
} finally {
|
||||||
assertTrue("Read a closed steam", errored);
|
assertTrue("Read a closed steam", errored);
|
||||||
}
|
}
|
||||||
verify(brs, times(brisOpens)).openInputStream();
|
verify(bris, times(brisOpens)).openInputStream();
|
||||||
verify(brs, times(brisCloses)).close();
|
verify(bris, times(brisCloses)).close();
|
||||||
|
|
||||||
verify(mockStream, times(isCloses)).close();
|
verify(mockStream, times(isCloses)).close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue