HDFS-7163. WebHdfsFileSystem should retry reads according to the configured retry policy. Contributed by Eric Payne.
This commit is contained in:
parent
5c0ff69618
commit
867048c3e4
|
@ -18,7 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.hdfs.web;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -34,8 +36,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import javax.ws.rs.core.HttpHeaders;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.io.input.BoundedInputStream;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
|
@ -44,6 +49,7 @@ import org.apache.hadoop.fs.CreateFlag;
|
|||
import org.apache.hadoop.fs.DelegationTokenRenewer;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FSInputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||
|
@ -545,7 +551,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
* Also implements two-step connects for other operations redirected to
|
||||
* a DN such as open and checksum
|
||||
*/
|
||||
private HttpURLConnection connect(URL url) throws IOException {
|
||||
protected HttpURLConnection connect(URL url) throws IOException {
|
||||
//redirect hostname and port
|
||||
String redirectHost = null;
|
||||
|
||||
|
@ -698,7 +704,7 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
*/
|
||||
abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
|
||||
private final Path fspath;
|
||||
private final Param<?,?>[] parameters;
|
||||
private Param<?,?>[] parameters;
|
||||
|
||||
AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
|
||||
Param<?,?>... parameters) {
|
||||
|
@ -714,6 +720,10 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
this.parameters = parameters;
|
||||
}
|
||||
|
||||
protected void updateURLParameters(Param<?, ?>... p) {
|
||||
this.parameters = p;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected URL getUrl() throws IOException {
|
||||
if (excludeDatanodes.getValue() != null) {
|
||||
|
@ -1235,15 +1245,10 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(final Path f, final int buffersize
|
||||
public FSDataInputStream open(final Path f, final int bufferSize
|
||||
) throws IOException {
|
||||
statistics.incrementReadOps(1);
|
||||
final HttpOpParam.Op op = GetOpParam.Op.OPEN;
|
||||
// use a runner so the open can recover from an invalid token
|
||||
FsPathConnectionRunner runner =
|
||||
new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
|
||||
return new FSDataInputStream(new OffsetUrlInputStream(
|
||||
new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
|
||||
return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1524,4 +1529,346 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
InetSocketAddress[] getResolvedNNAddr() {
|
||||
return nnAddrs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setRetryPolicy(RetryPolicy rp) {
|
||||
this.retryPolicy = rp;
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is used for opening, reading, and seeking files while using the
|
||||
* WebHdfsFileSystem. This class will invoke the retry policy when performing
|
||||
* any of these actions.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public class WebHdfsInputStream extends FSInputStream {
|
||||
private ReadRunner readRunner = null;
|
||||
|
||||
WebHdfsInputStream(Path path, int buffersize) throws IOException {
|
||||
// Only create the ReadRunner once. Each read's byte array and position
|
||||
// will be updated within the ReadRunner object before every read.
|
||||
readRunner = new ReadRunner(path, buffersize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
final byte[] b = new byte[1];
|
||||
return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte b[], int off, int len) throws IOException {
|
||||
return readRunner.read(b, off, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void seek(long newPos) throws IOException {
|
||||
readRunner.seek(newPos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPos() throws IOException {
|
||||
return readRunner.getPos();
|
||||
}
|
||||
|
||||
protected int getBufferSize() throws IOException {
|
||||
return readRunner.getBufferSize();
|
||||
}
|
||||
|
||||
protected Path getPath() throws IOException {
|
||||
return readRunner.getPath();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean seekToNewSource(long targetPos) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
readRunner.close();
|
||||
}
|
||||
|
||||
public void setFileLength(long len) {
|
||||
readRunner.setFileLength(len);
|
||||
}
|
||||
|
||||
public long getFileLength() {
|
||||
return readRunner.getFileLength();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ReadRunner getReadRunner() {
|
||||
return readRunner;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setReadRunner(ReadRunner rr) {
|
||||
this.readRunner = rr;
|
||||
}
|
||||
}
|
||||
|
||||
enum RunnerState {
|
||||
DISCONNECTED, // Connection is closed programmatically by ReadRunner
|
||||
OPEN, // Connection has been established by ReadRunner
|
||||
SEEK, // Calling code has explicitly called seek()
|
||||
CLOSED // Calling code has explicitly called close()
|
||||
}
|
||||
|
||||
/**
|
||||
* This class will allow retries to occur for both open and read operations.
|
||||
* The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object,
|
||||
* which creates a new ReadRunner object that will be used to open a
|
||||
* connection and read or seek into the input stream.
|
||||
*
|
||||
* ReadRunner is a subclass of the AbstractRunner class, which will run the
|
||||
* ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse
|
||||
* methods within a retry loop, based on the configured retry policy.
|
||||
* ReadRunner#connect will create a connection if one has not already been
|
||||
* created. Otherwise, it will return the previously created connection
|
||||
* object. This is necessary because a new connection should not be created
|
||||
* for every read.
|
||||
* Likewise, ReadRunner#getUrl will construct a new URL object only if the
|
||||
* connection has not previously been established. Otherwise, it will return
|
||||
* the previously created URL object.
|
||||
* ReadRunner#getResponse will initialize the input stream if it has not
|
||||
* already been initialized and read the requested data from the specified
|
||||
* input stream.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected class ReadRunner extends AbstractFsPathRunner<Integer> {
|
||||
private InputStream in = null;
|
||||
private HttpURLConnection cachedConnection = null;
|
||||
private byte[] readBuffer;
|
||||
private int readOffset;
|
||||
private int readLength;
|
||||
private RunnerState runnerState = RunnerState.DISCONNECTED;
|
||||
private URL originalUrl = null;
|
||||
private URL resolvedUrl = null;
|
||||
|
||||
private final Path path;
|
||||
private final int bufferSize;
|
||||
private long pos = 0;
|
||||
private long fileLength = 0;
|
||||
|
||||
/* The following methods are WebHdfsInputStream helpers. */
|
||||
|
||||
ReadRunner(Path p, int bs) throws IOException {
|
||||
super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
|
||||
this.path = p;
|
||||
this.bufferSize = bs;
|
||||
}
|
||||
|
||||
int read(byte[] b, int off, int len) throws IOException {
|
||||
if (runnerState == RunnerState.CLOSED) {
|
||||
throw new IOException("Stream closed");
|
||||
}
|
||||
|
||||
// Before the first read, pos and fileLength will be 0 and readBuffer
|
||||
// will all be null. They will be initialized once the first connection
|
||||
// is made. Only after that it makes sense to compare pos and fileLength.
|
||||
if (pos >= fileLength && readBuffer != null) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// If a seek is occurring, the input stream will have been closed, so it
|
||||
// needs to be reopened. Use the URLRunner to call AbstractRunner#connect
|
||||
// with the previously-cached resolved URL and with the 'redirected' flag
|
||||
// set to 'true'. The resolved URL contains the URL of the previously
|
||||
// opened DN as opposed to the NN. It is preferable to use the resolved
|
||||
// URL when creating a connection because it does not hit the NN or every
|
||||
// seek, nor does it open a connection to a new DN after every seek.
|
||||
// The redirect flag is needed so that AbstractRunner#connect knows the
|
||||
// URL is already resolved.
|
||||
// Note that when the redirected flag is set, retries are not attempted.
|
||||
// So, if the connection fails using URLRunner, clear out the connection
|
||||
// and fall through to establish the connection using ReadRunner.
|
||||
if (runnerState == RunnerState.SEEK) {
|
||||
try {
|
||||
final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
|
||||
cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true).run();
|
||||
} catch (IOException ioe) {
|
||||
closeInputStream(RunnerState.DISCONNECTED);
|
||||
}
|
||||
}
|
||||
|
||||
readBuffer = b;
|
||||
readOffset = off;
|
||||
readLength = len;
|
||||
|
||||
int count = -1;
|
||||
count = this.run();
|
||||
if (count >= 0) {
|
||||
statistics.incrementBytesRead(count);
|
||||
pos += count;
|
||||
} else if (pos < fileLength) {
|
||||
throw new EOFException(
|
||||
"Premature EOF: pos=" + pos + " < filelength=" + fileLength);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
void seek(long newPos) throws IOException {
|
||||
if (pos != newPos) {
|
||||
pos = newPos;
|
||||
closeInputStream(RunnerState.SEEK);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
closeInputStream(RunnerState.CLOSED);
|
||||
}
|
||||
|
||||
/* The following methods are overriding AbstractRunner methods,
|
||||
* to be called within the retry policy context by runWithRetry.
|
||||
*/
|
||||
|
||||
@Override
|
||||
protected URL getUrl() throws IOException {
|
||||
// This method is called every time either a read is executed.
|
||||
// The check for connection == null is to ensure that a new URL is only
|
||||
// created upon a new connection and not for every read.
|
||||
if (cachedConnection == null) {
|
||||
// Update URL with current offset. BufferSize doesn't change, but it
|
||||
// still must be included when creating the new URL.
|
||||
updateURLParameters(new BufferSizeParam(bufferSize),
|
||||
new OffsetParam(pos));
|
||||
originalUrl = super.getUrl();
|
||||
}
|
||||
return originalUrl;
|
||||
}
|
||||
|
||||
/* Only make the connection if it is not already open. Don't cache the
|
||||
* connection here. After this method is called, runWithRetry will call
|
||||
* validateResponse, and then call the below ReadRunner#getResponse. If
|
||||
* the code path makes it that far, then we can cache the connection.
|
||||
*/
|
||||
@Override
|
||||
protected HttpURLConnection connect(URL url) throws IOException {
|
||||
HttpURLConnection conn = cachedConnection;
|
||||
if (conn == null) {
|
||||
try {
|
||||
conn = super.connect(url);
|
||||
} catch (IOException e) {
|
||||
closeInputStream(RunnerState.DISCONNECTED);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
/*
|
||||
* This method is used to perform reads within the retry policy context.
|
||||
* This code is relying on runWithRetry to always call the above connect
|
||||
* method and the verifyResponse method prior to calling getResponse.
|
||||
*/
|
||||
@Override
|
||||
Integer getResponse(final HttpURLConnection conn)
|
||||
throws IOException {
|
||||
try {
|
||||
// In the "open-then-read" use case, runWithRetry will have executed
|
||||
// ReadRunner#connect to make the connection and then executed
|
||||
// validateResponse to validate the response code. Only then do we want
|
||||
// to cache the connection.
|
||||
// In the "read-after-seek" use case, the connection is made and the
|
||||
// response is validated by the URLRunner. ReadRunner#read then caches
|
||||
// the connection and the ReadRunner#connect will pass on the cached
|
||||
// connection
|
||||
// In either case, stream initialization is done here if necessary.
|
||||
cachedConnection = conn;
|
||||
if (in == null) {
|
||||
in = initializeInputStream(conn);
|
||||
}
|
||||
|
||||
int count = in.read(readBuffer, readOffset, readLength);
|
||||
if (count < 0 && pos < fileLength) {
|
||||
throw new EOFException(
|
||||
"Premature EOF: pos=" + pos + " < filelength=" + fileLength);
|
||||
}
|
||||
return Integer.valueOf(count);
|
||||
} catch (IOException e) {
|
||||
String redirectHost = resolvedUrl.getAuthority();
|
||||
if (excludeDatanodes.getValue() != null) {
|
||||
excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
|
||||
+ excludeDatanodes.getValue());
|
||||
} else {
|
||||
excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
|
||||
}
|
||||
|
||||
// If an exception occurs, close the input stream and null it out so
|
||||
// that if the abstract runner decides to retry, it will reconnect.
|
||||
closeInputStream(RunnerState.DISCONNECTED);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
InputStream initializeInputStream(HttpURLConnection conn)
|
||||
throws IOException {
|
||||
// Cache the resolved URL so that it can be used in the event of
|
||||
// a future seek operation.
|
||||
resolvedUrl = removeOffsetParam(conn.getURL());
|
||||
final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH);
|
||||
InputStream inStream = conn.getInputStream();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("open file: " + conn.getURL());
|
||||
}
|
||||
if (cl != null) {
|
||||
long streamLength = Long.parseLong(cl);
|
||||
fileLength = pos + streamLength;
|
||||
// Java has a bug with >2GB request streams. It won't bounds check
|
||||
// the reads so the transfer blocks until the server times out
|
||||
inStream = new BoundedInputStream(inStream, streamLength);
|
||||
} else {
|
||||
fileLength = getHdfsFileStatus(path).getLen();
|
||||
}
|
||||
// Wrapping in BufferedInputStream because it is more performant than
|
||||
// BoundedInputStream by itself.
|
||||
runnerState = RunnerState.OPEN;
|
||||
return new BufferedInputStream(inStream, bufferSize);
|
||||
}
|
||||
|
||||
// Close both the InputStream and the connection.
|
||||
@VisibleForTesting
|
||||
void closeInputStream(RunnerState rs) throws IOException {
|
||||
if (in != null) {
|
||||
IOUtils.close(cachedConnection);
|
||||
in = null;
|
||||
}
|
||||
cachedConnection = null;
|
||||
runnerState = rs;
|
||||
}
|
||||
|
||||
/* Getters and Setters */
|
||||
|
||||
@VisibleForTesting
|
||||
protected InputStream getInputStream() {
|
||||
return in;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected void setInputStream(InputStream inStream) {
|
||||
in = inStream;
|
||||
}
|
||||
|
||||
Path getPath() {
|
||||
return path;
|
||||
}
|
||||
|
||||
int getBufferSize() {
|
||||
return bufferSize;
|
||||
}
|
||||
|
||||
long getFileLength() {
|
||||
return fileLength;
|
||||
}
|
||||
|
||||
void setFileLength(long len) {
|
||||
fileLength = len;
|
||||
}
|
||||
|
||||
long getPos() {
|
||||
return pos;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2555,6 +2555,9 @@ Release 2.7.3 - UNRELEASED
|
|||
|
||||
IMPROVEMENTS
|
||||
|
||||
HDFS-7163. WebHdfsFileSystem should retry reads according to the configured
|
||||
retry policy. (Eric Payne via kihwal)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
|
||||
import org.apache.hadoop.hdfs.web.ByteRangeInputStream;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
|
||||
import org.apache.hadoop.io.erasurecode.CodecUtil;
|
||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
|
||||
import org.junit.Assert;
|
||||
|
@ -186,7 +186,7 @@ public class StripedFileTestUtil {
|
|||
assertSeekAndRead(in, pos, fileLength);
|
||||
}
|
||||
|
||||
if (!(in.getWrappedStream() instanceof ByteRangeInputStream)) {
|
||||
if (!(in.getWrappedStream() instanceof WebHdfsInputStream)) {
|
||||
try {
|
||||
in.seek(-1);
|
||||
Assert.fail("Should be failed if seek to negative offset");
|
||||
|
|
|
@ -1234,7 +1234,7 @@ public class FSXAttrBaseTest {
|
|||
throws Exception {
|
||||
// Test that a file with the xattr can or can't be opened.
|
||||
try {
|
||||
userFs.open(filePath);
|
||||
userFs.open(filePath).read();
|
||||
assertFalse("open succeeded but expected it to fail", expectOpenFailure);
|
||||
} catch (AccessControlException e) {
|
||||
assertTrue("open failed but expected it to succeed", expectOpenFailure);
|
||||
|
|
|
@ -260,7 +260,7 @@ public class TestAuditLogs {
|
|||
setupAuditLogs();
|
||||
|
||||
WebHdfsFileSystem webfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(userGroupInfo, conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
webfs.open(file);
|
||||
webfs.open(file).read();
|
||||
|
||||
verifyAuditLogsCheckPattern(true, 3, webOpenPattern);
|
||||
}
|
||||
|
|
|
@ -21,13 +21,17 @@ package org.apache.hadoop.hdfs.web;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URL;
|
||||
|
@ -43,12 +47,14 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|||
import org.apache.hadoop.fs.ContentSummary;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.TestDFSClientRetries;
|
||||
|
@ -59,11 +65,16 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
|||
import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
|
||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
|
||||
import org.apache.hadoop.hdfs.web.resources.LengthParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
|
||||
import org.apache.hadoop.hdfs.web.resources.Param;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -71,6 +82,15 @@ import org.junit.Assert;
|
|||
import org.junit.Test;
|
||||
import org.mockito.internal.util.reflection.Whitebox;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/** Test WebHDFS */
|
||||
public class TestWebHDFS {
|
||||
static final Log LOG = LogFactory.getLog(TestWebHDFS.class);
|
||||
|
@ -791,4 +811,142 @@ public class TestWebHDFS {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test(timeout=90000)
|
||||
public void testWebHdfsReadRetries() throws Exception {
|
||||
// ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
||||
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||
final Path dir = new Path("/testWebHdfsReadRetries");
|
||||
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
|
||||
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024*512);
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
|
||||
final short numDatanodes = 1;
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(numDatanodes)
|
||||
.build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
final FileSystem fs = WebHdfsTestUtil
|
||||
.getWebHdfsFileSystem(conf, WebHdfsConstants.WEBHDFS_SCHEME);
|
||||
|
||||
//create a file
|
||||
final long length = 1L << 20;
|
||||
final Path file1 = new Path(dir, "testFile");
|
||||
|
||||
DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
|
||||
|
||||
//get file status and check that it was written properly.
|
||||
final FileStatus s1 = fs.getFileStatus(file1);
|
||||
assertEquals("Write failed for file " + file1, length, s1.getLen());
|
||||
|
||||
// Ensure file can be read through WebHdfsInputStream
|
||||
FSDataInputStream in = fs.open(file1);
|
||||
assertTrue("Input stream is not an instance of class WebHdfsInputStream",
|
||||
in.getWrappedStream() instanceof WebHdfsInputStream);
|
||||
int count = 0;
|
||||
for(; in.read() != -1; count++);
|
||||
assertEquals("Read failed for file " + file1, s1.getLen(), count);
|
||||
assertEquals("Sghould not be able to read beyond end of file",
|
||||
in.read(), -1);
|
||||
in.close();
|
||||
try {
|
||||
in.read();
|
||||
fail("Read after close should have failed");
|
||||
} catch(IOException ioe) { }
|
||||
|
||||
WebHdfsFileSystem wfs = (WebHdfsFileSystem)fs;
|
||||
// Read should not be retried if AccessControlException is encountered.
|
||||
String msg = "ReadRetries: Test Access Control Exception";
|
||||
testReadRetryExceptionHelper(wfs, file1,
|
||||
new AccessControlException(msg), msg, false, 1);
|
||||
|
||||
// Retry policy should be invoked if IOExceptions are thrown.
|
||||
msg = "ReadRetries: Test SocketTimeoutException";
|
||||
testReadRetryExceptionHelper(wfs, file1,
|
||||
new SocketTimeoutException(msg), msg, true, 5);
|
||||
msg = "ReadRetries: Test SocketException";
|
||||
testReadRetryExceptionHelper(wfs, file1,
|
||||
new SocketException(msg), msg, true, 5);
|
||||
msg = "ReadRetries: Test EOFException";
|
||||
testReadRetryExceptionHelper(wfs, file1,
|
||||
new EOFException(msg), msg, true, 5);
|
||||
msg = "ReadRetries: Test Generic IO Exception";
|
||||
testReadRetryExceptionHelper(wfs, file1,
|
||||
new IOException(msg), msg, true, 5);
|
||||
|
||||
// If InvalidToken exception occurs, WebHdfs only retries if the
|
||||
// delegation token was replaced. Do that twice, then verify by checking
|
||||
// the number of times it tried.
|
||||
WebHdfsFileSystem spyfs = spy(wfs);
|
||||
when(spyfs.replaceExpiredDelegationToken()).thenReturn(true, true, false);
|
||||
msg = "ReadRetries: Test Invalid Token Exception";
|
||||
testReadRetryExceptionHelper(spyfs, file1,
|
||||
new InvalidToken(msg), msg, false, 3);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean attemptedRetry;
|
||||
private void testReadRetryExceptionHelper(WebHdfsFileSystem fs, Path fn,
|
||||
final IOException ex, String msg, boolean shouldAttemptRetry,
|
||||
int numTimesTried)
|
||||
throws Exception {
|
||||
// Ovverride WebHdfsInputStream#getInputStream so that it returns
|
||||
// an input stream that throws the specified exception when read
|
||||
// is called.
|
||||
FSDataInputStream in = fs.open(fn);
|
||||
in.read(); // Connection is made only when the first read() occurs.
|
||||
final WebHdfsInputStream webIn =
|
||||
(WebHdfsInputStream)(in.getWrappedStream());
|
||||
|
||||
final InputStream spyInputStream =
|
||||
spy(webIn.getReadRunner().getInputStream());
|
||||
doThrow(ex).when(spyInputStream).read((byte[])any(), anyInt(), anyInt());
|
||||
final WebHdfsFileSystem.ReadRunner rr = spy(webIn.getReadRunner());
|
||||
doReturn(spyInputStream)
|
||||
.when(rr).initializeInputStream((HttpURLConnection) any());
|
||||
rr.setInputStream(spyInputStream);
|
||||
webIn.setReadRunner(rr);
|
||||
|
||||
// Override filesystem's retry policy in order to verify that
|
||||
// WebHdfsInputStream is calling shouldRetry for the appropriate
|
||||
// exceptions.
|
||||
final RetryAction retryAction = new RetryAction(RetryDecision.RETRY);
|
||||
final RetryAction failAction = new RetryAction(RetryDecision.FAIL);
|
||||
RetryPolicy rp = new RetryPolicy() {
|
||||
@Override
|
||||
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||
boolean isIdempotentOrAtMostOnce) throws Exception {
|
||||
attemptedRetry = true;
|
||||
if (retries > 3) {
|
||||
return failAction;
|
||||
} else {
|
||||
return retryAction;
|
||||
}
|
||||
}
|
||||
};
|
||||
fs.setRetryPolicy(rp);
|
||||
|
||||
// If the retry logic is exercised, attemptedRetry will be true. Some
|
||||
// exceptions should exercise the retry logic and others should not.
|
||||
// Either way, the value of attemptedRetry should match shouldAttemptRetry.
|
||||
attemptedRetry = false;
|
||||
try {
|
||||
webIn.read();
|
||||
fail(msg + ": Read should have thrown exception.");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.getMessage().contains(msg));
|
||||
}
|
||||
assertEquals(msg + ": Read should " + (shouldAttemptRetry ? "" : "not ")
|
||||
+ "have called shouldRetry. ",
|
||||
attemptedRetry, shouldAttemptRetry);
|
||||
|
||||
verify(rr, times(numTimesTried)).getResponse((HttpURLConnection) any());
|
||||
webIn.close();
|
||||
in.close();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -182,7 +182,7 @@ public class TestWebHdfsFileSystemContract extends FileSystemContractBaseTest {
|
|||
final Path p = new Path("/test/testOpenNonExistFile");
|
||||
//open it as a file, should get FileNotFoundException
|
||||
try {
|
||||
fs.open(p);
|
||||
fs.open(p).read();
|
||||
fail("Expected FileNotFoundException was not thrown");
|
||||
} catch(FileNotFoundException fnfe) {
|
||||
WebHdfsFileSystem.LOG.info("This is expected.", fnfe);
|
||||
|
|
|
@ -383,7 +383,9 @@ public class TestWebHdfsTokens {
|
|||
reset(fs);
|
||||
|
||||
// verify an expired token is replaced with a new token
|
||||
fs.open(p).close();
|
||||
InputStream is = fs.open(p);
|
||||
is.read();
|
||||
is.close();
|
||||
verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
||||
verify(fs, times(1)).replaceExpiredDelegationToken();
|
||||
verify(fs, times(1)).getDelegationToken(null);
|
||||
|
@ -398,7 +400,7 @@ public class TestWebHdfsTokens {
|
|||
// verify with open because it's a little different in how it
|
||||
// opens connections
|
||||
fs.cancelDelegationToken(fs.getRenewToken());
|
||||
InputStream is = fs.open(p);
|
||||
is = fs.open(p);
|
||||
is.read();
|
||||
is.close();
|
||||
verify(fs, times(2)).getDelegationToken(); // first bad, then good
|
||||
|
|
Loading…
Reference in New Issue