HDFS-7163. WebHdfsFileSystem should retry reads according to the configured retry policy. Contributed by Eric Payne.

(cherry picked from commit 867048c3e4)
(cherry picked from commit 131260f0a7)
This commit is contained in:
Kihwal Lee 2015-12-22 14:20:10 -06:00
parent 8fad4c780a
commit 263c544021
7 changed files with 525 additions and 14 deletions

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hdfs.web;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@ -34,8 +36,11 @@
import java.util.Map;
import java.util.StringTokenizer;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -44,6 +49,7 @@
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
@ -554,7 +560,7 @@ public T run() throws IOException {
* Also implements two-step connects for other operations redirected to
* a DN such as open and checksum
*/
private HttpURLConnection connect(URL url) throws IOException {
protected HttpURLConnection connect(URL url) throws IOException {
//redirect hostname and port
String redirectHost = null;
@ -707,7 +713,7 @@ private void shouldRetry(final IOException ioe, final int retry
*/
abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
private final Path fspath;
private final Param<?,?>[] parameters;
private Param<?,?>[] parameters;
AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
Param<?,?>... parameters) {
@ -723,6 +729,10 @@ abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
this.parameters = parameters;
}
protected void updateURLParameters(Param<?, ?>... p) {
this.parameters = p;
}
@Override
protected URL getUrl() throws IOException {
if (excludeDatanodes.getValue() != null) {
@ -1232,15 +1242,10 @@ public boolean delete(Path f, boolean recursive) throws IOException {
}
@Override
public FSDataInputStream open(final Path f, final int buffersize
public FSDataInputStream open(final Path f, final int bufferSize
) throws IOException {
statistics.incrementReadOps(1);
final HttpOpParam.Op op = GetOpParam.Op.OPEN;
// use a runner so the open can recover from an invalid token
FsPathConnectionRunner runner =
new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
return new FSDataInputStream(new OffsetUrlInputStream(
new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
}
@Override
@ -1521,4 +1526,346 @@ public String getCanonicalServiceName() {
InetSocketAddress[] getResolvedNNAddr() {
return nnAddrs;
}
@VisibleForTesting
public void setRetryPolicy(RetryPolicy rp) {
this.retryPolicy = rp;
}
/**
* This class is used for opening, reading, and seeking files while using the
* WebHdfsFileSystem. This class will invoke the retry policy when performing
* any of these actions.
*/
@VisibleForTesting
public class WebHdfsInputStream extends FSInputStream {
private ReadRunner readRunner = null;
WebHdfsInputStream(Path path, int buffersize) throws IOException {
// Only create the ReadRunner once. Each read's byte array and position
// will be updated within the ReadRunner object before every read.
readRunner = new ReadRunner(path, buffersize);
}
@Override
public int read() throws IOException {
final byte[] b = new byte[1];
return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff);
}
@Override
public int read(byte b[], int off, int len) throws IOException {
return readRunner.read(b, off, len);
}
@Override
public void seek(long newPos) throws IOException {
readRunner.seek(newPos);
}
@Override
public long getPos() throws IOException {
return readRunner.getPos();
}
protected int getBufferSize() throws IOException {
return readRunner.getBufferSize();
}
protected Path getPath() throws IOException {
return readRunner.getPath();
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
}
@Override
public void close() throws IOException {
readRunner.close();
}
public void setFileLength(long len) {
readRunner.setFileLength(len);
}
public long getFileLength() {
return readRunner.getFileLength();
}
@VisibleForTesting
ReadRunner getReadRunner() {
return readRunner;
}
@VisibleForTesting
void setReadRunner(ReadRunner rr) {
this.readRunner = rr;
}
}
enum RunnerState {
DISCONNECTED, // Connection is closed programmatically by ReadRunner
OPEN, // Connection has been established by ReadRunner
SEEK, // Calling code has explicitly called seek()
CLOSED // Calling code has explicitly called close()
}
/**
* This class will allow retries to occur for both open and read operations.
* The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object,
* which creates a new ReadRunner object that will be used to open a
* connection and read or seek into the input stream.
*
* ReadRunner is a subclass of the AbstractRunner class, which will run the
* ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse
* methods within a retry loop, based on the configured retry policy.
* ReadRunner#connect will create a connection if one has not already been
* created. Otherwise, it will return the previously created connection
* object. This is necessary because a new connection should not be created
* for every read.
* Likewise, ReadRunner#getUrl will construct a new URL object only if the
* connection has not previously been established. Otherwise, it will return
* the previously created URL object.
* ReadRunner#getResponse will initialize the input stream if it has not
* already been initialized and read the requested data from the specified
* input stream.
*/
@VisibleForTesting
protected class ReadRunner extends AbstractFsPathRunner<Integer> {
private InputStream in = null;
private HttpURLConnection cachedConnection = null;
private byte[] readBuffer;
private int readOffset;
private int readLength;
private RunnerState runnerState = RunnerState.DISCONNECTED;
private URL originalUrl = null;
private URL resolvedUrl = null;
private final Path path;
private final int bufferSize;
private long pos = 0;
private long fileLength = 0;
/* The following methods are WebHdfsInputStream helpers. */
ReadRunner(Path p, int bs) throws IOException {
super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
this.path = p;
this.bufferSize = bs;
}
int read(byte[] b, int off, int len) throws IOException {
if (runnerState == RunnerState.CLOSED) {
throw new IOException("Stream closed");
}
// Before the first read, pos and fileLength will be 0 and readBuffer
// will all be null. They will be initialized once the first connection
// is made. Only after that it makes sense to compare pos and fileLength.
if (pos >= fileLength && readBuffer != null) {
return -1;
}
// If a seek is occurring, the input stream will have been closed, so it
// needs to be reopened. Use the URLRunner to call AbstractRunner#connect
// with the previously-cached resolved URL and with the 'redirected' flag
// set to 'true'. The resolved URL contains the URL of the previously
// opened DN as opposed to the NN. It is preferable to use the resolved
// URL when creating a connection because it does not hit the NN or every
// seek, nor does it open a connection to a new DN after every seek.
// The redirect flag is needed so that AbstractRunner#connect knows the
// URL is already resolved.
// Note that when the redirected flag is set, retries are not attempted.
// So, if the connection fails using URLRunner, clear out the connection
// and fall through to establish the connection using ReadRunner.
if (runnerState == RunnerState.SEEK) {
try {
final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run();
} catch (IOException ioe) {
closeInputStream(RunnerState.DISCONNECTED);
}
}
readBuffer = b;
readOffset = off;
readLength = len;
int count = -1;
count = this.run();
if (count >= 0) {
statistics.incrementBytesRead(count);
pos += count;
} else if (pos < fileLength) {
throw new EOFException(
"Premature EOF: pos=" + pos + " < filelength=" + fileLength);
}
return count;
}
void seek(long newPos) throws IOException {
if (pos != newPos) {
pos = newPos;
closeInputStream(RunnerState.SEEK);
}
}
public void close() throws IOException {
closeInputStream(RunnerState.CLOSED);
}
/* The following methods are overriding AbstractRunner methods,
* to be called within the retry policy context by runWithRetry.
*/
@Override
protected URL getUrl() throws IOException {
// This method is called every time either a read is executed.
// The check for connection == null is to ensure that a new URL is only
// created upon a new connection and not for every read.
if (cachedConnection == null) {
// Update URL with current offset. BufferSize doesn't change, but it
// still must be included when creating the new URL.
updateURLParameters(new BufferSizeParam(bufferSize),
new OffsetParam(pos));
originalUrl = super.getUrl();
}
return originalUrl;
}
/* Only make the connection if it is not already open. Don't cache the
* connection here. After this method is called, runWithRetry will call
* validateResponse, and then call the below ReadRunner#getResponse. If
* the code path makes it that far, then we can cache the connection.
*/
@Override
protected HttpURLConnection connect(URL url) throws IOException {
HttpURLConnection conn = cachedConnection;
if (conn == null) {
try {
conn = super.connect(url);
} catch (IOException e) {
closeInputStream(RunnerState.DISCONNECTED);
throw e;
}
}
return conn;
}
/*
* This method is used to perform reads within the retry policy context.
* This code is relying on runWithRetry to always call the above connect
* method and the verifyResponse method prior to calling getResponse.
*/
@Override
Integer getResponse(final HttpURLConnection conn)
throws IOException {
try {
// In the "open-then-read" use case, runWithRetry will have executed
// ReadRunner#connect to make the connection and then executed
// validateResponse to validate the response code. Only then do we want
// to cache the connection.
// In the "read-after-seek" use case, the connection is made and the
// response is validated by the URLRunner. ReadRunner#read then caches
// the connection and the ReadRunner#connect will pass on the cached
// connection
// In either case, stream initialization is done here if necessary.
cachedConnection = conn;
if (in == null) {
in = initializeInputStream(conn);
}
int count = in.read(readBuffer, readOffset, readLength);
if (count < 0 && pos < fileLength) {
throw new EOFException(
"Premature EOF: pos=" + pos + " < filelength=" + fileLength);
}
return Integer.valueOf(count);
} catch (IOException e) {
String redirectHost = resolvedUrl.getAuthority();
if (excludeDatanodes.getValue() != null) {
excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ excludeDatanodes.getValue());
} else {
excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
}
// If an exception occurs, close the input stream and null it out so
// that if the abstract runner decides to retry, it will reconnect.
closeInputStream(RunnerState.DISCONNECTED);
throw e;
}
}
@VisibleForTesting
InputStream initializeInputStream(HttpURLConnection conn)
throws IOException {
// Cache the resolved URL so that it can be used in the event of
// a future seek operation.
resolvedUrl = removeOffsetParam(conn.getURL());
final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH);
InputStream inStream = conn.getInputStream();
if (LOG.isDebugEnabled()) {
LOG.debug("open file: " + conn.getURL());
}
if (cl != null) {
long streamLength = Long.parseLong(cl);
fileLength = pos + streamLength;
// Java has a bug with >2GB request streams. It won't bounds check
// the reads so the transfer blocks until the server times out
inStream = new BoundedInputStream(inStream, streamLength);
} else {
fileLength = getHdfsFileStatus(path).getLen();
}
// Wrapping in BufferedInputStream because it is more performant than
// BoundedInputStream by itself.
runnerState = RunnerState.OPEN;
return new BufferedInputStream(inStream, bufferSize);
}
// Close both the InputStream and the connection.
@VisibleForTesting
void closeInputStream(RunnerState rs) throws IOException {
if (in != null) {
IOUtils.close(cachedConnection);
in = null;
}
cachedConnection = null;
runnerState = rs;
}
/* Getters and Setters */
@VisibleForTesting
protected InputStream getInputStream() {
return in;
}
@VisibleForTesting
protected void setInputStream(InputStream inStream) {
in = inStream;
}
Path getPath() {
return path;
}
int getBufferSize() {
return bufferSize;
}
long getFileLength() {
return fileLength;
}
void setFileLength(long len) {
fileLength = len;
}
long getPos() {
return pos;
}
}
}

View File

@ -1634,6 +1634,9 @@ Release 2.7.3 - UNRELEASED
IMPROVEMENTS
HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
retry policy. (Eric Payne via kihwal)
OPTIMIZATIONS
BUG FIXES

View File

@ -1234,7 +1234,7 @@ private void verifyFileAccess(FileSystem userFs, boolean expectOpenFailure)
throws Exception {
// Test that a file with the xattr can or can't be opened.
try {
userFs.open(filePath);
userFs.open(filePath).read();
assertFalse("open succeeded but expected it to fail", expectOpenFailure);
} catch (AccessControlException e) {
assertTrue("open failed but expected it to succeed", expectOpenFailure);

View File

@ -287,7 +287,7 @@ public void testAuditWebHdfsOpen() throws Exception {
setupAuditLogs();
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
webfs.open(file);
webfs.open(file).read();
verifyAuditLogsCheckPattern(true, 3, webOpenPattern);
}

View File

@ -20,13 +20,18 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@ -42,12 +47,14 @@
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -58,11 +65,16 @@
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.hdfs.web.resources.LengthParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
@ -70,6 +82,15 @@
import org.junit.Test;
import org.mockito.internal.util.reflection.Whitebox;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/** Test WebHDFS */
public class TestWebHDFS {
static final Log LOG = LogFactory.getLog(TestWebHDFS.class);
@ -748,4 +769,142 @@ public WebHdfsFileSystem run() throws IOException {
}
});
}
@Test(timeout=90000)
public void testWebHdfsReadRetries() throws Exception {
// ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
final Configuration conf = WebHdfsTestUtil.createConf();
final Path dir = new Path("/testWebHdfsReadRetries");
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024*512);
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
final short numDatanodes = 1;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDatanodes)
.build();
try {
cluster.waitActive();
final FileSystem fs = WebHdfsTestUtil
.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
//create a file
final long length = 1L << 20;
final Path file1 = new Path(dir, "testFile");
DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
//get file status and check that it was written properly.
final FileStatus s1 = fs.getFileStatus(file1);
assertEquals("Write failed for file " + file1, length, s1.getLen());
// Ensure file can be read through WebHdfsInputStream
FSDataInputStream in = fs.open(file1);
assertTrue("Input stream is not an instance of class WebHdfsInputStream",
in.getWrappedStream() instanceof WebHdfsInputStream);
int count = 0;
for(; in.read() != -1; count++);
assertEquals("Read failed for file " + file1, s1.getLen(), count);
assertEquals("Sghould not be able to read beyond end of file",
in.read(), -1);
in.close();
try {
in.read();
fail("Read after close should have failed");
} catch(IOException ioe) { }
WebHdfsFileSystem wfs = (WebHdfsFileSystem)fs;
// Read should not be retried if AccessControlException is encountered.
String msg = "ReadRetries: Test Access Control Exception";
testReadRetryExceptionHelper(wfs, file1,
new AccessControlException(msg), msg, false, 1);
// Retry policy should be invoked if IOExceptions are thrown.
msg = "ReadRetries: Test SocketTimeoutException";
testReadRetryExceptionHelper(wfs, file1,
new SocketTimeoutException(msg), msg, true, 5);
msg = "ReadRetries: Test SocketException";
testReadRetryExceptionHelper(wfs, file1,
new SocketException(msg), msg, true, 5);
msg = "ReadRetries: Test EOFException";
testReadRetryExceptionHelper(wfs, file1,
new EOFException(msg), msg, true, 5);
msg = "ReadRetries: Test Generic IO Exception";
testReadRetryExceptionHelper(wfs, file1,
new IOException(msg), msg, true, 5);
// If InvalidToken exception occurs, WebHdfs only retries if the
// delegation token was replaced. Do that twice, then verify by checking
// the number of times it tried.
WebHdfsFileSystem spyfs = spy(wfs);
when(spyfs.replaceExpiredDelegationToken()).thenReturn(true, true, false);
msg = "ReadRetries: Test Invalid Token Exception";
testReadRetryExceptionHelper(spyfs, file1,
new InvalidToken(msg), msg, false, 3);
} finally {
cluster.shutdown();
}
}
public boolean attemptedRetry;
private void testReadRetryExceptionHelper(WebHdfsFileSystem fs, Path fn,
final IOException ex, String msg, boolean shouldAttemptRetry,
int numTimesTried)
throws Exception {
// Ovverride WebHdfsInputStream#getInputStream so that it returns
// an input stream that throws the specified exception when read
// is called.
FSDataInputStream in = fs.open(fn);
in.read(); // Connection is made only when the first read() occurs.
final WebHdfsInputStream webIn =
(WebHdfsInputStream)(in.getWrappedStream());
final InputStream spyInputStream =
spy(webIn.getReadRunner().getInputStream());
doThrow(ex).when(spyInputStream).read((byte[])any(), anyInt(), anyInt());
final WebHdfsFileSystem.ReadRunner rr = spy(webIn.getReadRunner());
doReturn(spyInputStream)
.when(rr).initializeInputStream((HttpURLConnection) any());
rr.setInputStream(spyInputStream);
webIn.setReadRunner(rr);
// Override filesystem's retry policy in order to verify that
// WebHdfsInputStream is calling shouldRetry for the appropriate
// exceptions.
final RetryAction retryAction = new RetryAction(RetryDecision.RETRY);
final RetryAction failAction = new RetryAction(RetryDecision.FAIL);
RetryPolicy rp = new RetryPolicy() {
@Override
public RetryAction shouldRetry(Exception e, int retries, int failovers,
boolean isIdempotentOrAtMostOnce) throws Exception {
attemptedRetry = true;
if (retries > 3) {
return failAction;
} else {
return retryAction;
}
}
};
fs.setRetryPolicy(rp);
// If the retry logic is exercised, attemptedRetry will be true. Some
// exceptions should exercise the retry logic and others should not.
// Either way, the value of attemptedRetry should match shouldAttemptRetry.
attemptedRetry = false;
try {
webIn.read();
fail(msg + ": Read should have thrown exception.");
} catch (Exception e) {
assertTrue(e.getMessage().contains(msg));
}
assertEquals(msg + ": Read should " + (shouldAttemptRetry ? "" : "not ")
+ "have called shouldRetry. ",
attemptedRetry, shouldAttemptRetry);
verify(rr, times(numTimesTried)).getResponse((HttpURLConnection) any());
webIn.close();
in.close();
}
}

View File

@ -184,7 +184,7 @@ public void testOpenNonExistFile() throws IOException {
final Path p = new Path("/test/testOpenNonExistFile");
//open it as a file, should get FileNotFoundException
try {
fs.open(p);
fs.open(p).read();
fail("Expected FileNotFoundException was not thrown");
} catch(FileNotFoundException fnfe) {
WebHdfsFileSystem.LOG.info("This is expected.", fnfe);

View File

@ -387,7 +387,9 @@ public WebHdfsFileSystem run() throws IOException {
reset(fs);
// verify an expired token is replaced with a new token
fs.open(p).close();
InputStream is = fs.open(p);
is.read();
is.close();
verify(fs, times(2)).getDelegationToken(); // first bad, then good
verify(fs, times(1)).replaceExpiredDelegationToken();
verify(fs, times(1)).getDelegationToken(null);
@ -402,7 +404,7 @@ public WebHdfsFileSystem run() throws IOException {
// verify with open because it's a little different in how it
// opens connections
fs.cancelDelegationToken(fs.getRenewToken());
InputStream is = fs.open(p);
is = fs.open(p);
is.read();
is.close();
verify(fs, times(2)).getDelegationToken(); // first bad, then good