svn merge -c 1367841 from trunk for HDFS-3667. Add retry support to WebHdfsFileSystem.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1367842 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d40b6be2f5
commit
15a9b9829a
|
@ -178,6 +178,8 @@ Release 2.0.1-alpha - UNRELEASED
|
||||||
HDFS-3650. Use MutableQuantiles to provide latency histograms for various
|
HDFS-3650. Use MutableQuantiles to provide latency histograms for various
|
||||||
operations. (Andrew Wang via atm)
|
operations. (Andrew Wang via atm)
|
||||||
|
|
||||||
|
HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-2982. Startup performance suffers when there are many edit log
|
HDFS-2982. Startup performance suffers when there are many edit log
|
||||||
|
|
|
@ -57,9 +57,9 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
||||||
return url;
|
return url;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract HttpURLConnection openConnection() throws IOException;
|
/** Connect to server with a data offset. */
|
||||||
|
protected abstract HttpURLConnection connect(final long offset,
|
||||||
protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
|
final boolean resolved) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum StreamStatus {
|
enum StreamStatus {
|
||||||
|
@ -85,9 +85,6 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
||||||
this.resolvedURL = r;
|
this.resolvedURL = r;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void checkResponseCode(final HttpURLConnection connection
|
|
||||||
) throws IOException;
|
|
||||||
|
|
||||||
protected abstract URL getResolvedUrl(final HttpURLConnection connection
|
protected abstract URL getResolvedUrl(final HttpURLConnection connection
|
||||||
) throws IOException;
|
) throws IOException;
|
||||||
|
|
||||||
|
@ -113,13 +110,10 @@ public abstract class ByteRangeInputStream extends FSInputStream {
|
||||||
protected InputStream openInputStream() throws IOException {
|
protected InputStream openInputStream() throws IOException {
|
||||||
// Use the original url if no resolved url exists, eg. if
|
// Use the original url if no resolved url exists, eg. if
|
||||||
// it's the first time a request is made.
|
// it's the first time a request is made.
|
||||||
final URLOpener opener =
|
final boolean resolved = resolvedURL.getURL() != null;
|
||||||
(resolvedURL.getURL() == null) ? originalURL : resolvedURL;
|
final URLOpener opener = resolved? resolvedURL: originalURL;
|
||||||
|
|
||||||
final HttpURLConnection connection = opener.openConnection(startPos);
|
|
||||||
connection.connect();
|
|
||||||
checkResponseCode(connection);
|
|
||||||
|
|
||||||
|
final HttpURLConnection connection = opener.connect(startPos, resolved);
|
||||||
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
|
||||||
if (cl == null) {
|
if (cl == null) {
|
||||||
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
|
||||||
|
|
|
@ -342,19 +342,28 @@ public class HftpFileSystem extends FileSystem
|
||||||
super(url);
|
super(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected HttpURLConnection openConnection() throws IOException {
|
protected HttpURLConnection openConnection() throws IOException {
|
||||||
return (HttpURLConnection)URLUtils.openConnection(url);
|
return (HttpURLConnection)URLUtils.openConnection(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Use HTTP Range header for specifying offset. */
|
/** Use HTTP Range header for specifying offset. */
|
||||||
@Override
|
@Override
|
||||||
protected HttpURLConnection openConnection(final long offset) throws IOException {
|
protected HttpURLConnection connect(final long offset,
|
||||||
|
final boolean resolved) throws IOException {
|
||||||
final HttpURLConnection conn = openConnection();
|
final HttpURLConnection conn = openConnection();
|
||||||
conn.setRequestMethod("GET");
|
conn.setRequestMethod("GET");
|
||||||
if (offset != 0L) {
|
if (offset != 0L) {
|
||||||
conn.setRequestProperty("Range", "bytes=" + offset + "-");
|
conn.setRequestProperty("Range", "bytes=" + offset + "-");
|
||||||
}
|
}
|
||||||
|
conn.connect();
|
||||||
|
|
||||||
|
//Expects HTTP_OK or HTTP_PARTIAL response codes.
|
||||||
|
final int code = conn.getResponseCode();
|
||||||
|
if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
|
||||||
|
throw new IOException("HTTP_PARTIAL expected, received " + code);
|
||||||
|
} else if (offset == 0L && code != HttpURLConnection.HTTP_OK) {
|
||||||
|
throw new IOException("HTTP_OK expected, received " + code);
|
||||||
|
}
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -368,22 +377,6 @@ public class HftpFileSystem extends FileSystem
|
||||||
this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
|
this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Expects HTTP_OK and HTTP_PARTIAL response codes. */
|
|
||||||
@Override
|
|
||||||
protected void checkResponseCode(final HttpURLConnection connection
|
|
||||||
) throws IOException {
|
|
||||||
final int code = connection.getResponseCode();
|
|
||||||
if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) {
|
|
||||||
// We asked for a byte range but did not receive a partial content
|
|
||||||
// response...
|
|
||||||
throw new IOException("HTTP_PARTIAL expected, received " + code);
|
|
||||||
} else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) {
|
|
||||||
// We asked for all bytes from the beginning but didn't receive a 200
|
|
||||||
// response (none of the other 2xx codes are valid here)
|
|
||||||
throw new IOException("HTTP_OK expected, received " + code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected URL getResolvedUrl(final HttpURLConnection connection) {
|
protected URL getResolvedUrl(final HttpURLConnection connection) {
|
||||||
return connection.getURL();
|
return connection.getURL();
|
||||||
|
|
|
@ -259,7 +259,7 @@ public class NameNodeProxies {
|
||||||
*
|
*
|
||||||
* Note that dfs.client.retry.max < 0 is not allowed.
|
* Note that dfs.client.retry.max < 0 is not allowed.
|
||||||
*/
|
*/
|
||||||
private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
|
public static RetryPolicy getDefaultRetryPolicy(Configuration conf) {
|
||||||
final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
|
final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
|
LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
|
||||||
|
@ -300,6 +300,13 @@ public class NameNodeProxies {
|
||||||
+ p.getClass().getSimpleName() + ", exception=" + e);
|
+ p.getClass().getSimpleName() + ", exception=" + e);
|
||||||
return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
|
return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "RetryPolicy[" + multipleLinearRandomRetry + ", "
|
||||||
|
+ RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
|
||||||
|
+ "]";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -335,7 +342,7 @@ public class NameNodeProxies {
|
||||||
boolean withRetries) throws IOException {
|
boolean withRetries) throws IOException {
|
||||||
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
|
||||||
|
|
||||||
final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
|
final RetryPolicy defaultPolicy = getDefaultRetryPolicy(conf);
|
||||||
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
|
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
|
||||||
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
|
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
|
||||||
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
|
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
import org.apache.hadoop.hdfs.ByteRangeInputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||||
|
@ -88,6 +89,7 @@ import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
|
||||||
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
import org.apache.hadoop.hdfs.web.resources.UserParam;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
@ -147,6 +149,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
private URI uri;
|
private URI uri;
|
||||||
private Token<?> delegationToken;
|
private Token<?> delegationToken;
|
||||||
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
private final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
|
||||||
|
private RetryPolicy retryPolicy = null;
|
||||||
private Path workingDir;
|
private Path workingDir;
|
||||||
|
|
||||||
{
|
{
|
||||||
|
@ -179,6 +182,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
throw new IllegalArgumentException(e);
|
throw new IllegalArgumentException(e);
|
||||||
}
|
}
|
||||||
this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
|
||||||
|
this.retryPolicy = NameNodeProxies.getDefaultRetryPolicy(conf);
|
||||||
this.workingDir = getHomeDirectory();
|
this.workingDir = getHomeDirectory();
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
@ -276,24 +280,51 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
|
private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
|
||||||
final HttpURLConnection conn) throws IOException {
|
final HttpURLConnection conn, boolean unwrapException) throws IOException {
|
||||||
final int code = conn.getResponseCode();
|
final int code = conn.getResponseCode();
|
||||||
if (code != op.getExpectedHttpResponseCode()) {
|
if (code != op.getExpectedHttpResponseCode()) {
|
||||||
final Map<?, ?> m;
|
final Map<?, ?> m;
|
||||||
try {
|
try {
|
||||||
m = jsonParse(conn, true);
|
m = jsonParse(conn, true);
|
||||||
} catch(IOException e) {
|
} catch(Exception e) {
|
||||||
throw new IOException("Unexpected HTTP response: code=" + code + " != "
|
throw new IOException("Unexpected HTTP response: code=" + code + " != "
|
||||||
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
|
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
|
||||||
+ ", message=" + conn.getResponseMessage(), e);
|
+ ", message=" + conn.getResponseMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m.get(RemoteException.class.getSimpleName()) == null) {
|
if (m == null) {
|
||||||
|
throw new IOException("Unexpected HTTP response: code=" + code + " != "
|
||||||
|
+ op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
|
||||||
|
+ ", message=" + conn.getResponseMessage());
|
||||||
|
} else if (m.get(RemoteException.class.getSimpleName()) == null) {
|
||||||
return m;
|
return m;
|
||||||
}
|
}
|
||||||
|
|
||||||
final RemoteException re = JsonUtil.toRemoteException(m);
|
final RemoteException re = JsonUtil.toRemoteException(m);
|
||||||
throw re.unwrapRemoteException(AccessControlException.class,
|
throw unwrapException? toIOException(re): re;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Covert an exception to an IOException.
|
||||||
|
*
|
||||||
|
* For a non-IOException, wrap it with IOException.
|
||||||
|
* For a RemoteException, unwrap it.
|
||||||
|
* For an IOException which is not a RemoteException, return it.
|
||||||
|
*/
|
||||||
|
private static IOException toIOException(Exception e) {
|
||||||
|
if (!(e instanceof IOException)) {
|
||||||
|
return new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
final IOException ioe = (IOException)e;
|
||||||
|
if (!(ioe instanceof RemoteException)) {
|
||||||
|
return ioe;
|
||||||
|
}
|
||||||
|
|
||||||
|
final RemoteException re = (RemoteException)ioe;
|
||||||
|
return re.unwrapRemoteException(AccessControlException.class,
|
||||||
InvalidToken.class,
|
InvalidToken.class,
|
||||||
AuthenticationException.class,
|
AuthenticationException.class,
|
||||||
AuthorizationException.class,
|
AuthorizationException.class,
|
||||||
|
@ -305,8 +336,6 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
DSQuotaExceededException.class,
|
DSQuotaExceededException.class,
|
||||||
NSQuotaExceededException.class);
|
NSQuotaExceededException.class);
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a URL pointing to given path on the namenode.
|
* Return a URL pointing to given path on the namenode.
|
||||||
|
@ -362,68 +391,13 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
}
|
}
|
||||||
|
|
||||||
private HttpURLConnection getHttpUrlConnection(URL url)
|
private HttpURLConnection getHttpUrlConnection(URL url)
|
||||||
throws IOException {
|
throws IOException, AuthenticationException {
|
||||||
final HttpURLConnection conn;
|
final HttpURLConnection conn;
|
||||||
try {
|
|
||||||
if (ugi.hasKerberosCredentials()) {
|
if (ugi.hasKerberosCredentials()) {
|
||||||
conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
|
conn = new AuthenticatedURL(AUTH).openConnection(url, authToken);
|
||||||
} else {
|
} else {
|
||||||
conn = (HttpURLConnection)url.openConnection();
|
conn = (HttpURLConnection)url.openConnection();
|
||||||
}
|
}
|
||||||
} catch (AuthenticationException e) {
|
|
||||||
throw new IOException("Authentication failed, url=" + url, e);
|
|
||||||
}
|
|
||||||
return conn;
|
|
||||||
}
|
|
||||||
|
|
||||||
private HttpURLConnection httpConnect(final HttpOpParam.Op op, final Path fspath,
|
|
||||||
final Param<?,?>... parameters) throws IOException {
|
|
||||||
final URL url = toUrl(op, fspath, parameters);
|
|
||||||
|
|
||||||
//connect and get response
|
|
||||||
HttpURLConnection conn = getHttpUrlConnection(url);
|
|
||||||
try {
|
|
||||||
conn.setRequestMethod(op.getType().toString());
|
|
||||||
if (op.getDoOutput()) {
|
|
||||||
conn = twoStepWrite(conn, op);
|
|
||||||
conn.setRequestProperty("Content-Type", "application/octet-stream");
|
|
||||||
}
|
|
||||||
conn.setDoOutput(op.getDoOutput());
|
|
||||||
conn.connect();
|
|
||||||
return conn;
|
|
||||||
} catch (IOException e) {
|
|
||||||
conn.disconnect();
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Two-step Create/Append:
|
|
||||||
* Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
||||||
* Step 2) Submit another Http request with the URL from the Location header with data.
|
|
||||||
*
|
|
||||||
* The reason of having two-step create/append is for preventing clients to
|
|
||||||
* send out the data before the redirect. This issue is addressed by the
|
|
||||||
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
|
|
||||||
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
|
|
||||||
* and Java 6 http client), which do not correctly implement "Expect:
|
|
||||||
* 100-continue". The two-step create/append is a temporary workaround for
|
|
||||||
* the software library bugs.
|
|
||||||
*/
|
|
||||||
static HttpURLConnection twoStepWrite(HttpURLConnection conn,
|
|
||||||
final HttpOpParam.Op op) throws IOException {
|
|
||||||
//Step 1) Submit a Http request with neither auto-redirect nor data.
|
|
||||||
conn.setInstanceFollowRedirects(false);
|
|
||||||
conn.setDoOutput(false);
|
|
||||||
conn.connect();
|
|
||||||
validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn);
|
|
||||||
final String redirect = conn.getHeaderField("Location");
|
|
||||||
conn.disconnect();
|
|
||||||
|
|
||||||
//Step 2) Submit another Http request with the URL from the Location header with data.
|
|
||||||
conn = (HttpURLConnection)new URL(redirect).openConnection();
|
|
||||||
conn.setRequestMethod(op.getType().toString());
|
|
||||||
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
|
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -439,12 +413,161 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
*/
|
*/
|
||||||
private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
|
private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
|
||||||
final Param<?,?>... parameters) throws IOException {
|
final Param<?,?>... parameters) throws IOException {
|
||||||
final HttpURLConnection conn = httpConnect(op, fspath, parameters);
|
return new Runner(op, fspath, parameters).run().json;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is for initialing a HTTP connection, connecting to server,
|
||||||
|
* obtaining a response, and also handling retry on failures.
|
||||||
|
*/
|
||||||
|
class Runner {
|
||||||
|
private final HttpOpParam.Op op;
|
||||||
|
private final URL url;
|
||||||
|
private final boolean redirected;
|
||||||
|
|
||||||
|
private boolean checkRetry;
|
||||||
|
private HttpURLConnection conn = null;
|
||||||
|
private Map<?, ?> json = null;
|
||||||
|
|
||||||
|
Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) {
|
||||||
|
this.op = op;
|
||||||
|
this.url = url;
|
||||||
|
this.redirected = redirected;
|
||||||
|
}
|
||||||
|
|
||||||
|
Runner(final HttpOpParam.Op op, final Path fspath,
|
||||||
|
final Param<?,?>... parameters) throws IOException {
|
||||||
|
this(op, toUrl(op, fspath, parameters), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Runner(final HttpOpParam.Op op, final HttpURLConnection conn) {
|
||||||
|
this(op, null, false);
|
||||||
|
this.conn = conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void init() throws IOException {
|
||||||
|
checkRetry = !redirected;
|
||||||
try {
|
try {
|
||||||
final Map<?, ?> m = validateResponse(op, conn);
|
conn = getHttpUrlConnection(url);
|
||||||
return m != null? m: jsonParse(conn, false);
|
} catch(AuthenticationException ae) {
|
||||||
} finally {
|
checkRetry = false;
|
||||||
|
throw new IOException("Authentication failed, url=" + url, ae);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void connect() throws IOException {
|
||||||
|
connect(op.getDoOutput());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void connect(boolean doOutput) throws IOException {
|
||||||
|
conn.setRequestMethod(op.getType().toString());
|
||||||
|
conn.setDoOutput(doOutput);
|
||||||
|
conn.setInstanceFollowRedirects(false);
|
||||||
|
conn.connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void disconnect() {
|
||||||
|
if (conn != null) {
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
|
conn = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Runner run() throws IOException {
|
||||||
|
for(int retry = 0; ; retry++) {
|
||||||
|
try {
|
||||||
|
init();
|
||||||
|
if (op.getDoOutput()) {
|
||||||
|
twoStepWrite();
|
||||||
|
} else {
|
||||||
|
getResponse(op != GetOpParam.Op.OPEN);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
} catch(IOException ioe) {
|
||||||
|
shouldRetry(ioe, retry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shouldRetry(final IOException ioe, final int retry
|
||||||
|
) throws IOException {
|
||||||
|
if (checkRetry) {
|
||||||
|
try {
|
||||||
|
final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
|
||||||
|
ioe, retry, 0, true);
|
||||||
|
if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
|
||||||
|
LOG.info("Retrying connect to namenode: " + nnAddr
|
||||||
|
+ ". Already tried " + retry + " time(s); retry policy is "
|
||||||
|
+ retryPolicy + ", delay " + a.delayMillis + "ms.");
|
||||||
|
Thread.sleep(a.delayMillis);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} catch(Exception e) {
|
||||||
|
LOG.warn("Original exception is ", ioe);
|
||||||
|
throw toIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw toIOException(ioe);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Two-step Create/Append:
|
||||||
|
* Step 1) Submit a Http request with neither auto-redirect nor data.
|
||||||
|
* Step 2) Submit another Http request with the URL from the Location header with data.
|
||||||
|
*
|
||||||
|
* The reason of having two-step create/append is for preventing clients to
|
||||||
|
* send out the data before the redirect. This issue is addressed by the
|
||||||
|
* "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
|
||||||
|
* Unfortunately, there are software library bugs (e.g. Jetty 6 http server
|
||||||
|
* and Java 6 http client), which do not correctly implement "Expect:
|
||||||
|
* 100-continue". The two-step create/append is a temporary workaround for
|
||||||
|
* the software library bugs.
|
||||||
|
*/
|
||||||
|
HttpURLConnection twoStepWrite() throws IOException {
|
||||||
|
//Step 1) Submit a Http request with neither auto-redirect nor data.
|
||||||
|
connect(false);
|
||||||
|
validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op), conn, false);
|
||||||
|
final String redirect = conn.getHeaderField("Location");
|
||||||
|
disconnect();
|
||||||
|
checkRetry = false;
|
||||||
|
|
||||||
|
//Step 2) Submit another Http request with the URL from the Location header with data.
|
||||||
|
conn = (HttpURLConnection)new URL(redirect).openConnection();
|
||||||
|
conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM);
|
||||||
|
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
|
||||||
|
connect();
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
|
||||||
|
FSDataOutputStream write(final int bufferSize) throws IOException {
|
||||||
|
return WebHdfsFileSystem.this.write(op, conn, bufferSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
void getResponse(boolean getJsonAndDisconnect) throws IOException {
|
||||||
|
try {
|
||||||
|
connect();
|
||||||
|
final int code = conn.getResponseCode();
|
||||||
|
if (!redirected && op.getRedirect()
|
||||||
|
&& code != op.getExpectedHttpResponseCode()) {
|
||||||
|
final String redirect = conn.getHeaderField("Location");
|
||||||
|
json = validateResponse(HttpOpParam.TemporaryRedirectOp.valueOf(op),
|
||||||
|
conn, false);
|
||||||
|
disconnect();
|
||||||
|
|
||||||
|
checkRetry = false;
|
||||||
|
conn = (HttpURLConnection)new URL(redirect).openConnection();
|
||||||
|
connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
json = validateResponse(op, conn, false);
|
||||||
|
if (json == null && getJsonAndDisconnect) {
|
||||||
|
json = jsonParse(conn, false);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (getJsonAndDisconnect) {
|
||||||
|
disconnect();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,7 +701,7 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
super.close();
|
super.close();
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
validateResponse(op, conn);
|
validateResponse(op, conn, true);
|
||||||
} finally {
|
} finally {
|
||||||
conn.disconnect();
|
conn.disconnect();
|
||||||
}
|
}
|
||||||
|
@ -594,13 +717,14 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
|
|
||||||
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
|
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
|
||||||
final HttpURLConnection conn = httpConnect(op, f,
|
return new Runner(op, f,
|
||||||
new PermissionParam(applyUMask(permission)),
|
new PermissionParam(applyUMask(permission)),
|
||||||
new OverwriteParam(overwrite),
|
new OverwriteParam(overwrite),
|
||||||
new BufferSizeParam(bufferSize),
|
new BufferSizeParam(bufferSize),
|
||||||
new ReplicationParam(replication),
|
new ReplicationParam(replication),
|
||||||
new BlockSizeParam(blockSize));
|
new BlockSizeParam(blockSize))
|
||||||
return write(op, conn, bufferSize);
|
.run()
|
||||||
|
.write(bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -609,9 +733,9 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
statistics.incrementWriteOps(1);
|
statistics.incrementWriteOps(1);
|
||||||
|
|
||||||
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
|
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
|
||||||
final HttpURLConnection conn = httpConnect(op, f,
|
return new Runner(op, f, new BufferSizeParam(bufferSize))
|
||||||
new BufferSizeParam(bufferSize));
|
.run()
|
||||||
return write(op, conn, bufferSize);
|
.write(bufferSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
|
@ -638,26 +762,17 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
}
|
}
|
||||||
|
|
||||||
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
|
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
|
||||||
/** The url with offset parameter */
|
|
||||||
private URL offsetUrl;
|
|
||||||
|
|
||||||
OffsetUrlOpener(final URL url) {
|
OffsetUrlOpener(final URL url) {
|
||||||
super(url);
|
super(url);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Open connection with offset url. */
|
/** Setup offset url and connect. */
|
||||||
@Override
|
@Override
|
||||||
protected HttpURLConnection openConnection() throws IOException {
|
protected HttpURLConnection connect(final long offset,
|
||||||
return getHttpUrlConnection(offsetUrl);
|
final boolean resolved) throws IOException {
|
||||||
}
|
final URL offsetUrl = offset == 0L? url
|
||||||
|
: new URL(url + "&" + new OffsetParam(offset));
|
||||||
/** Setup offset url before open connection. */
|
return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
|
||||||
@Override
|
|
||||||
protected HttpURLConnection openConnection(final long offset) throws IOException {
|
|
||||||
offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
|
|
||||||
final HttpURLConnection conn = openConnection();
|
|
||||||
conn.setRequestMethod("GET");
|
|
||||||
return conn;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -699,12 +814,6 @@ public class WebHdfsFileSystem extends FileSystem
|
||||||
super(o, r);
|
super(o, r);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void checkResponseCode(final HttpURLConnection connection
|
|
||||||
) throws IOException {
|
|
||||||
validateResponse(GetOpParam.Op.OPEN, connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Remove offset parameter before returning the resolved url. */
|
/** Remove offset parameter before returning the resolved url. */
|
||||||
@Override
|
@Override
|
||||||
protected URL getResolvedUrl(final HttpURLConnection connection
|
protected URL getResolvedUrl(final HttpURLConnection connection
|
||||||
|
|
|
@ -43,6 +43,11 @@ public class DeleteOpParam extends HttpOpParam<DeleteOpParam.Op> {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getRedirect() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getExpectedHttpResponseCode() {
|
public int getExpectedHttpResponseCode() {
|
||||||
return expectedHttpResponseCode;
|
return expectedHttpResponseCode;
|
||||||
|
|
|
@ -23,25 +23,27 @@ import java.net.HttpURLConnection;
|
||||||
public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
||||||
/** Get operations. */
|
/** Get operations. */
|
||||||
public static enum Op implements HttpOpParam.Op {
|
public static enum Op implements HttpOpParam.Op {
|
||||||
OPEN(HttpURLConnection.HTTP_OK),
|
OPEN(true, HttpURLConnection.HTTP_OK),
|
||||||
|
|
||||||
GETFILESTATUS(HttpURLConnection.HTTP_OK),
|
GETFILESTATUS(false, HttpURLConnection.HTTP_OK),
|
||||||
LISTSTATUS(HttpURLConnection.HTTP_OK),
|
LISTSTATUS(false, HttpURLConnection.HTTP_OK),
|
||||||
GETCONTENTSUMMARY(HttpURLConnection.HTTP_OK),
|
GETCONTENTSUMMARY(false, HttpURLConnection.HTTP_OK),
|
||||||
GETFILECHECKSUM(HttpURLConnection.HTTP_OK),
|
GETFILECHECKSUM(true, HttpURLConnection.HTTP_OK),
|
||||||
|
|
||||||
GETHOMEDIRECTORY(HttpURLConnection.HTTP_OK),
|
GETHOMEDIRECTORY(false, HttpURLConnection.HTTP_OK),
|
||||||
GETDELEGATIONTOKEN(HttpURLConnection.HTTP_OK),
|
GETDELEGATIONTOKEN(false, HttpURLConnection.HTTP_OK),
|
||||||
GETDELEGATIONTOKENS(HttpURLConnection.HTTP_OK),
|
GETDELEGATIONTOKENS(false, HttpURLConnection.HTTP_OK),
|
||||||
|
|
||||||
/** GET_BLOCK_LOCATIONS is a private unstable op. */
|
/** GET_BLOCK_LOCATIONS is a private unstable op. */
|
||||||
GET_BLOCK_LOCATIONS(HttpURLConnection.HTTP_OK),
|
GET_BLOCK_LOCATIONS(false, HttpURLConnection.HTTP_OK),
|
||||||
|
|
||||||
NULL(HttpURLConnection.HTTP_NOT_IMPLEMENTED);
|
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
|
||||||
|
|
||||||
|
final boolean redirect;
|
||||||
final int expectedHttpResponseCode;
|
final int expectedHttpResponseCode;
|
||||||
|
|
||||||
Op(final int expectedHttpResponseCode) {
|
Op(final boolean redirect, final int expectedHttpResponseCode) {
|
||||||
|
this.redirect = redirect;
|
||||||
this.expectedHttpResponseCode = expectedHttpResponseCode;
|
this.expectedHttpResponseCode = expectedHttpResponseCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -55,6 +57,11 @@ public class GetOpParam extends HttpOpParam<GetOpParam.Op> {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getRedirect() {
|
||||||
|
return redirect;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getExpectedHttpResponseCode() {
|
public int getExpectedHttpResponseCode() {
|
||||||
return expectedHttpResponseCode;
|
return expectedHttpResponseCode;
|
||||||
|
|
|
@ -17,6 +17,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.web.resources;
|
package org.apache.hadoop.hdfs.web.resources;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import javax.ws.rs.core.Response;
|
import javax.ws.rs.core.Response;
|
||||||
|
|
||||||
|
|
||||||
|
@ -42,6 +46,9 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
|
||||||
/** @return true if the operation will do output. */
|
/** @return true if the operation will do output. */
|
||||||
public boolean getDoOutput();
|
public boolean getDoOutput();
|
||||||
|
|
||||||
|
/** @return true if the operation will be redirected. */
|
||||||
|
public boolean getRedirect();
|
||||||
|
|
||||||
/** @return true the expected http response code. */
|
/** @return true the expected http response code. */
|
||||||
public int getExpectedHttpResponseCode();
|
public int getExpectedHttpResponseCode();
|
||||||
|
|
||||||
|
@ -51,15 +58,25 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
|
||||||
|
|
||||||
/** Expects HTTP response 307 "Temporary Redirect". */
|
/** Expects HTTP response 307 "Temporary Redirect". */
|
||||||
public static class TemporaryRedirectOp implements Op {
|
public static class TemporaryRedirectOp implements Op {
|
||||||
static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(PutOpParam.Op.CREATE);
|
static final TemporaryRedirectOp CREATE = new TemporaryRedirectOp(
|
||||||
static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(PostOpParam.Op.APPEND);
|
PutOpParam.Op.CREATE);
|
||||||
|
static final TemporaryRedirectOp APPEND = new TemporaryRedirectOp(
|
||||||
|
PostOpParam.Op.APPEND);
|
||||||
|
static final TemporaryRedirectOp OPEN = new TemporaryRedirectOp(
|
||||||
|
GetOpParam.Op.OPEN);
|
||||||
|
static final TemporaryRedirectOp GETFILECHECKSUM = new TemporaryRedirectOp(
|
||||||
|
GetOpParam.Op.GETFILECHECKSUM);
|
||||||
|
|
||||||
|
static final List<TemporaryRedirectOp> values
|
||||||
|
= Collections.unmodifiableList(Arrays.asList(
|
||||||
|
new TemporaryRedirectOp[]{CREATE, APPEND, OPEN, GETFILECHECKSUM}));
|
||||||
|
|
||||||
/** Get an object for the given op. */
|
/** Get an object for the given op. */
|
||||||
public static TemporaryRedirectOp valueOf(final Op op) {
|
public static TemporaryRedirectOp valueOf(final Op op) {
|
||||||
if (op == CREATE.op) {
|
for(TemporaryRedirectOp t : values) {
|
||||||
return CREATE;
|
if (op == t.op) {
|
||||||
} else if (op == APPEND.op) {
|
return t;
|
||||||
return APPEND;
|
}
|
||||||
}
|
}
|
||||||
throw new IllegalArgumentException(op + " not found.");
|
throw new IllegalArgumentException(op + " not found.");
|
||||||
}
|
}
|
||||||
|
@ -80,6 +97,11 @@ public abstract class HttpOpParam<E extends Enum<E> & HttpOpParam.Op>
|
||||||
return op.getDoOutput();
|
return op.getDoOutput();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getRedirect() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/** Override the original expected response with "Temporary Redirect". */
|
/** Override the original expected response with "Temporary Redirect". */
|
||||||
@Override
|
@Override
|
||||||
public int getExpectedHttpResponseCode() {
|
public int getExpectedHttpResponseCode() {
|
||||||
|
|
|
@ -43,6 +43,11 @@ public class PostOpParam extends HttpOpParam<PostOpParam.Op> {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getRedirect() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getExpectedHttpResponseCode() {
|
public int getExpectedHttpResponseCode() {
|
||||||
return expectedHttpResponseCode;
|
return expectedHttpResponseCode;
|
||||||
|
|
|
@ -39,11 +39,11 @@ public class PutOpParam extends HttpOpParam<PutOpParam.Op> {
|
||||||
|
|
||||||
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
|
NULL(false, HttpURLConnection.HTTP_NOT_IMPLEMENTED);
|
||||||
|
|
||||||
final boolean doOutput;
|
final boolean doOutputAndRedirect;
|
||||||
final int expectedHttpResponseCode;
|
final int expectedHttpResponseCode;
|
||||||
|
|
||||||
Op(final boolean doOutput, final int expectedHttpResponseCode) {
|
Op(final boolean doOutputAndRedirect, final int expectedHttpResponseCode) {
|
||||||
this.doOutput = doOutput;
|
this.doOutputAndRedirect = doOutputAndRedirect;
|
||||||
this.expectedHttpResponseCode = expectedHttpResponseCode;
|
this.expectedHttpResponseCode = expectedHttpResponseCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +54,12 @@ public class PutOpParam extends HttpOpParam<PutOpParam.Op> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean getDoOutput() {
|
public boolean getDoOutput() {
|
||||||
return doOutput;
|
return doOutputAndRedirect;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getRedirect() {
|
||||||
|
return doOutputAndRedirect;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileChecksum;
|
import org.apache.hadoop.fs.FileChecksum;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -66,6 +67,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
|
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
|
@ -74,6 +76,7 @@ import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
@ -772,13 +775,17 @@ public class TestDFSClientRetries {
|
||||||
/** Test client retry with namenode restarting. */
|
/** Test client retry with namenode restarting. */
|
||||||
@Test
|
@Test
|
||||||
public void testNamenodeRestart() throws Exception {
|
public void testNamenodeRestart() throws Exception {
|
||||||
|
namenodeRestartTest(new Configuration(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void namenodeRestartTest(final Configuration conf,
|
||||||
|
final boolean isWebHDFS) throws Exception {
|
||||||
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
|
||||||
final List<Exception> exceptions = new ArrayList<Exception>();
|
final List<Exception> exceptions = new ArrayList<Exception>();
|
||||||
|
|
||||||
final Path dir = new Path("/testNamenodeRestart");
|
final Path dir = new Path("/testNamenodeRestart");
|
||||||
|
|
||||||
final Configuration conf = new Configuration();
|
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
|
||||||
|
|
||||||
final short numDatanodes = 3;
|
final short numDatanodes = 3;
|
||||||
|
@ -788,16 +795,18 @@ public class TestDFSClientRetries {
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
final FileSystem fs = isWebHDFS?
|
||||||
|
WebHdfsTestUtil.getWebHdfsFileSystem(conf): dfs;
|
||||||
final URI uri = dfs.getUri();
|
final URI uri = dfs.getUri();
|
||||||
assertTrue(HdfsUtils.isHealthy(uri));
|
assertTrue(HdfsUtils.isHealthy(uri));
|
||||||
|
|
||||||
//create a file
|
//create a file
|
||||||
final long length = 1L << 20;
|
final long length = 1L << 20;
|
||||||
final Path file1 = new Path(dir, "foo");
|
final Path file1 = new Path(dir, "foo");
|
||||||
DFSTestUtil.createFile(dfs, file1, length, numDatanodes, 20120406L);
|
DFSTestUtil.createFile(fs, file1, length, numDatanodes, 20120406L);
|
||||||
|
|
||||||
//get file status
|
//get file status
|
||||||
final FileStatus s1 = dfs.getFileStatus(file1);
|
final FileStatus s1 = fs.getFileStatus(file1);
|
||||||
assertEquals(length, s1.getLen());
|
assertEquals(length, s1.getLen());
|
||||||
|
|
||||||
//shutdown namenode
|
//shutdown namenode
|
||||||
|
@ -805,6 +814,25 @@ public class TestDFSClientRetries {
|
||||||
cluster.shutdownNameNode(0);
|
cluster.shutdownNameNode(0);
|
||||||
assertFalse(HdfsUtils.isHealthy(uri));
|
assertFalse(HdfsUtils.isHealthy(uri));
|
||||||
|
|
||||||
|
//namenode is down, read the file in a thread
|
||||||
|
final Thread reader = new Thread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
//it should retry till namenode is up.
|
||||||
|
final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS);
|
||||||
|
final FSDataInputStream in = fs.open(file1);
|
||||||
|
int count = 0;
|
||||||
|
for(; in.read() != -1; count++);
|
||||||
|
in.close();
|
||||||
|
assertEquals(s1.getLen(), count);
|
||||||
|
} catch (Exception e) {
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
reader.start();
|
||||||
|
|
||||||
//namenode is down, create another file in a thread
|
//namenode is down, create another file in a thread
|
||||||
final Path file3 = new Path(dir, "file");
|
final Path file3 = new Path(dir, "file");
|
||||||
final Thread thread = new Thread(new Runnable() {
|
final Thread thread = new Thread(new Runnable() {
|
||||||
|
@ -812,7 +840,7 @@ public class TestDFSClientRetries {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
//it should retry till namenode is up.
|
//it should retry till namenode is up.
|
||||||
final FileSystem fs = AppendTestUtil.createHdfsWithDifferentUsername(conf);
|
final FileSystem fs = createFsWithDifferentUsername(conf, isWebHDFS);
|
||||||
DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L);
|
DFSTestUtil.createFile(fs, file3, length, numDatanodes, 20120406L);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
exceptions.add(e);
|
exceptions.add(e);
|
||||||
|
@ -839,12 +867,15 @@ public class TestDFSClientRetries {
|
||||||
}).start();
|
}).start();
|
||||||
|
|
||||||
//namenode is down, it should retry until namenode is up again.
|
//namenode is down, it should retry until namenode is up again.
|
||||||
final FileStatus s2 = dfs.getFileStatus(file1);
|
final FileStatus s2 = fs.getFileStatus(file1);
|
||||||
assertEquals(s1, s2);
|
assertEquals(s1, s2);
|
||||||
|
|
||||||
//check file1 and file3
|
//check file1 and file3
|
||||||
thread.join();
|
thread.join();
|
||||||
assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file3));
|
assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen());
|
||||||
|
assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3));
|
||||||
|
|
||||||
|
reader.join();
|
||||||
|
|
||||||
//enter safe mode
|
//enter safe mode
|
||||||
assertTrue(HdfsUtils.isHealthy(uri));
|
assertTrue(HdfsUtils.isHealthy(uri));
|
||||||
|
@ -869,8 +900,8 @@ public class TestDFSClientRetries {
|
||||||
|
|
||||||
//namenode is in safe mode, create should retry until it leaves safe mode.
|
//namenode is in safe mode, create should retry until it leaves safe mode.
|
||||||
final Path file2 = new Path(dir, "bar");
|
final Path file2 = new Path(dir, "bar");
|
||||||
DFSTestUtil.createFile(dfs, file2, length, numDatanodes, 20120406L);
|
DFSTestUtil.createFile(fs, file2, length, numDatanodes, 20120406L);
|
||||||
assertEquals(dfs.getFileChecksum(file1), dfs.getFileChecksum(file2));
|
assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file2));
|
||||||
|
|
||||||
assertTrue(HdfsUtils.isHealthy(uri));
|
assertTrue(HdfsUtils.isHealthy(uri));
|
||||||
|
|
||||||
|
@ -878,7 +909,7 @@ public class TestDFSClientRetries {
|
||||||
final Path nonExisting = new Path(dir, "nonExisting");
|
final Path nonExisting = new Path(dir, "nonExisting");
|
||||||
LOG.info("setPermission: " + nonExisting);
|
LOG.info("setPermission: " + nonExisting);
|
||||||
try {
|
try {
|
||||||
dfs.setPermission(nonExisting, new FsPermission((short)0));
|
fs.setPermission(nonExisting, new FsPermission((short)0));
|
||||||
fail();
|
fail();
|
||||||
} catch(FileNotFoundException fnfe) {
|
} catch(FileNotFoundException fnfe) {
|
||||||
LOG.info("GOOD!", fnfe);
|
LOG.info("GOOD!", fnfe);
|
||||||
|
@ -896,6 +927,18 @@ public class TestDFSClientRetries {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static FileSystem createFsWithDifferentUsername(
|
||||||
|
final Configuration conf, final boolean isWebHDFS
|
||||||
|
) throws IOException, InterruptedException {
|
||||||
|
final String username = UserGroupInformation.getCurrentUser(
|
||||||
|
).getShortUserName() + "_XXX";
|
||||||
|
final UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
|
||||||
|
username, new String[]{"supergroup"});
|
||||||
|
|
||||||
|
return isWebHDFS? WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, conf)
|
||||||
|
: DFSTestUtil.getFileSystemAs(ugi, conf);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultipleLinearRandomRetry() {
|
public void testMultipleLinearRandomRetry() {
|
||||||
parseMultipleLinearRandomRetry(null, "");
|
parseMultipleLinearRandomRetry(null, "");
|
||||||
|
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
|
@ -140,9 +139,7 @@ public class TestDelegationTokenForProxyUser {
|
||||||
.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
|
.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() {
|
||||||
@Override
|
@Override
|
||||||
public Token<DelegationTokenIdentifier> run() throws IOException {
|
public Token<DelegationTokenIdentifier> run() throws IOException {
|
||||||
DistributedFileSystem dfs = (DistributedFileSystem) cluster
|
return cluster.getFileSystem().getDelegationToken("RenewerUser");
|
||||||
.getFileSystem();
|
|
||||||
return dfs.getDelegationToken("RenewerUser");
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();
|
||||||
|
@ -206,7 +203,7 @@ public class TestDelegationTokenForProxyUser {
|
||||||
final PutOpParam.Op op = PutOpParam.Op.CREATE;
|
final PutOpParam.Op op = PutOpParam.Op.CREATE;
|
||||||
final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
|
final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
|
||||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||||
conn = WebHdfsTestUtil.twoStepWrite(conn, op);
|
conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
|
||||||
final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
|
final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
|
||||||
out.write("Hello, webhdfs user!".getBytes());
|
out.write("Hello, webhdfs user!".getBytes());
|
||||||
out.close();
|
out.close();
|
||||||
|
@ -221,7 +218,7 @@ public class TestDelegationTokenForProxyUser {
|
||||||
final PostOpParam.Op op = PostOpParam.Op.APPEND;
|
final PostOpParam.Op op = PostOpParam.Op.APPEND;
|
||||||
final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
|
final URL url = WebHdfsTestUtil.toUrl(webhdfs, op, f, new DoAsParam(PROXY_USER));
|
||||||
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
|
||||||
conn = WebHdfsTestUtil.twoStepWrite(conn, op);
|
conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
|
||||||
final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
|
final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
|
||||||
out.write("\nHello again!".getBytes());
|
out.write("\nHello again!".getBytes());
|
||||||
out.close();
|
out.close();
|
||||||
|
|
|
@ -18,22 +18,10 @@
|
||||||
package org.apache.hadoop.hdfs.web;
|
package org.apache.hadoop.hdfs.web;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.mockito.Mockito.doReturn;
|
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
import static org.mockito.Mockito.times;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.hdfs.TestByteRangeInputStream.MockHttpURLConnection;
|
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlInputStream;
|
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.OffsetUrlOpener;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestOffsetUrlInputStream {
|
public class TestOffsetUrlInputStream {
|
||||||
|
@ -73,65 +61,4 @@ public class TestOffsetUrlInputStream {
|
||||||
WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
|
WebHdfsFileSystem.removeOffsetParam(new URL(s)).toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testByteRange() throws Exception {
|
|
||||||
final Configuration conf = new Configuration();
|
|
||||||
final String uri = WebHdfsFileSystem.SCHEME + "://localhost:50070/";
|
|
||||||
final WebHdfsFileSystem webhdfs = (WebHdfsFileSystem)FileSystem.get(new URI(uri), conf);
|
|
||||||
|
|
||||||
OffsetUrlOpener ospy = spy(webhdfs.new OffsetUrlOpener(new URL("http://test/")));
|
|
||||||
doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
|
|
||||||
.openConnection();
|
|
||||||
OffsetUrlOpener rspy = spy(webhdfs.new OffsetUrlOpener((URL) null));
|
|
||||||
doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
|
|
||||||
.openConnection();
|
|
||||||
final OffsetUrlInputStream is = new OffsetUrlInputStream(ospy, rspy);
|
|
||||||
|
|
||||||
assertEquals("getPos wrong", 0, is.getPos());
|
|
||||||
|
|
||||||
is.read();
|
|
||||||
|
|
||||||
assertNull("Initial call made incorrectly (Range Check)", ospy
|
|
||||||
.openConnection().getRequestProperty("Range"));
|
|
||||||
|
|
||||||
assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
|
|
||||||
|
|
||||||
is.read();
|
|
||||||
|
|
||||||
assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
|
|
||||||
|
|
||||||
// No additional connections should have been made (no seek)
|
|
||||||
|
|
||||||
rspy.setURL(new URL("http://resolvedurl/"));
|
|
||||||
|
|
||||||
is.seek(100);
|
|
||||||
is.read();
|
|
||||||
|
|
||||||
assertEquals("getPos should be 101 after reading one byte", 101,
|
|
||||||
is.getPos());
|
|
||||||
|
|
||||||
verify(rspy, times(1)).openConnection();
|
|
||||||
|
|
||||||
is.seek(101);
|
|
||||||
is.read();
|
|
||||||
|
|
||||||
verify(rspy, times(1)).openConnection();
|
|
||||||
|
|
||||||
// Seek to 101 should not result in another request"
|
|
||||||
|
|
||||||
is.seek(2500);
|
|
||||||
is.read();
|
|
||||||
|
|
||||||
((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
|
|
||||||
is.seek(0);
|
|
||||||
|
|
||||||
try {
|
|
||||||
is.read();
|
|
||||||
fail("Exception should be thrown when 206 response is given "
|
|
||||||
+ "but 200 is expected");
|
|
||||||
} catch (IOException e) {
|
|
||||||
WebHdfsFileSystem.LOG.info(e.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,12 +23,16 @@ import java.util.Random;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataInputStream;
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.TestDFSClientRetries;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -196,4 +200,12 @@ public class TestWebHDFS {
|
||||||
in.close();
|
in.close();
|
||||||
t.end(checked);
|
t.end(checked);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Test client retry with namenode restarting. */
|
||||||
|
@Test
|
||||||
|
public void testNamenodeRestart() throws Exception {
|
||||||
|
((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
|
||||||
|
final Configuration conf = WebHdfsTestUtil.createConf();
|
||||||
|
TestDFSClientRetries.namenodeRestartTest(conf, true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,13 +79,9 @@ public class WebHdfsTestUtil {
|
||||||
return WebHdfsFileSystem.jsonParse(conn, false);
|
return WebHdfsFileSystem.jsonParse(conn, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HttpURLConnection twoStepWrite(HttpURLConnection conn,
|
public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs,
|
||||||
final HttpOpParam.Op op) throws IOException {
|
final HttpOpParam.Op op, HttpURLConnection conn) throws IOException {
|
||||||
conn.setRequestMethod(op.getType().toString());
|
return webhdfs.new Runner(op, conn).twoStepWrite();
|
||||||
conn = WebHdfsFileSystem.twoStepWrite(conn, op);
|
|
||||||
conn.setDoOutput(true);
|
|
||||||
conn.connect();
|
|
||||||
return conn;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,
|
public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,
|
||||||
|
|
Loading…
Reference in New Issue