HDFS-10789. Route webhdfs through the RPC call queue. Contributed by Daryn Sharp and Rushabh S Shah.
(cherry picked from commit 85cd06f663
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
This commit is contained in:
parent
e341e5151a
commit
5305a392c3
|
@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.Server.Call;
|
import org.apache.hadoop.ipc.Server.Call;
|
||||||
|
@ -37,14 +38,10 @@ public abstract class ExternalCall<T> extends Call {
|
||||||
|
|
||||||
public abstract UserGroupInformation getRemoteUser();
|
public abstract UserGroupInformation getRemoteUser();
|
||||||
|
|
||||||
public final T get() throws IOException, InterruptedException {
|
public final T get() throws InterruptedException, ExecutionException {
|
||||||
waitForCompletion();
|
waitForCompletion();
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
if (error instanceof IOException) {
|
throw new ExecutionException(error);
|
||||||
throw (IOException)error;
|
|
||||||
} else {
|
|
||||||
throw new IOException(error);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,6 +72,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
@ -1001,8 +1002,9 @@ public class TestRPC extends TestRpcBase {
|
||||||
try {
|
try {
|
||||||
exceptionCall.get();
|
exceptionCall.get();
|
||||||
fail("didn't throw");
|
fail("didn't throw");
|
||||||
} catch (IOException ioe) {
|
} catch (ExecutionException ee) {
|
||||||
assertEquals(expectedIOE.getMessage(), ioe.getMessage());
|
assertTrue((ee.getCause()) instanceof IOException);
|
||||||
|
assertEquals(expectedIOE.getMessage(), ee.getCause().getMessage());
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
server.stop();
|
server.stop();
|
||||||
|
|
|
@ -73,6 +73,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.webhdfs.ugi.expire.after.access";
|
"dfs.webhdfs.ugi.expire.after.access";
|
||||||
public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
|
public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
|
||||||
10*60*1000; //10 minutes
|
10*60*1000; //10 minutes
|
||||||
|
public static final String DFS_WEBHDFS_USE_IPC_CALLQ =
|
||||||
|
"dfs.webhdfs.use.ipc.callq";
|
||||||
|
public static final boolean DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT = true;
|
||||||
|
|
||||||
// HA related configuration
|
// HA related configuration
|
||||||
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
|
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";
|
||||||
|
|
|
@ -246,7 +246,6 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
|
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
@ -327,7 +326,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
private void logAuditEvent(boolean succeeded, String cmd, String src,
|
private void logAuditEvent(boolean succeeded, String cmd, String src,
|
||||||
String dst, HdfsFileStatus stat) throws IOException {
|
String dst, HdfsFileStatus stat) throws IOException {
|
||||||
if (isAuditEnabled() && isExternalInvocation()) {
|
if (isAuditEnabled() && isExternalInvocation()) {
|
||||||
logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),
|
logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
|
||||||
cmd, src, dst, stat);
|
cmd, src, dst, stat);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5228,15 +5227,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
* RPC call context even if the client exits.
|
* RPC call context even if the client exits.
|
||||||
*/
|
*/
|
||||||
boolean isExternalInvocation() {
|
boolean isExternalInvocation() {
|
||||||
return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
|
return Server.isRpcInvocation();
|
||||||
}
|
|
||||||
|
|
||||||
private static InetAddress getRemoteIp() {
|
|
||||||
InetAddress ip = Server.getRemoteIp();
|
|
||||||
if (ip != null) {
|
|
||||||
return ip;
|
|
||||||
}
|
|
||||||
return NamenodeWebHdfsMethods.getRemoteIp();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// optimize ugi lookup for RPC operations to avoid a trip through
|
// optimize ugi lookup for RPC operations to avoid a trip through
|
||||||
|
@ -6773,7 +6764,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
sb.append(trackingId);
|
sb.append(trackingId);
|
||||||
}
|
}
|
||||||
sb.append("\t").append("proto=");
|
sb.append("\t").append("proto=");
|
||||||
sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
|
sb.append(Server.getProtocol());
|
||||||
if (isCallerContextEnabled &&
|
if (isCallerContextEnabled &&
|
||||||
callerContext != null &&
|
callerContext != null &&
|
||||||
callerContext.isContextValid()) {
|
callerContext.isContextValid()) {
|
||||||
|
|
|
@ -64,7 +64,9 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
|
import org.apache.hadoop.ipc.ExternalCall;
|
||||||
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
|
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
|
||||||
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
@ -409,6 +411,14 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
return rpcServer;
|
return rpcServer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void queueExternalCall(ExternalCall<?> extCall)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
if (rpcServer == null) {
|
||||||
|
throw new RetriableException("Namenode is in startup mode");
|
||||||
|
}
|
||||||
|
rpcServer.getClientRpcServer().queueCall(extCall);
|
||||||
|
}
|
||||||
|
|
||||||
public static void initMetrics(Configuration conf, NamenodeRole role) {
|
public static void initMetrics(Configuration conf, NamenodeRole role) {
|
||||||
metrics = NameNodeMetrics.create(conf, role);
|
metrics = NameNodeMetrics.create(conf, role);
|
||||||
}
|
}
|
||||||
|
|
|
@ -138,7 +138,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
|
@ -1683,10 +1682,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String getClientMachine() {
|
private static String getClientMachine() {
|
||||||
String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress();
|
String clientMachine = Server.getRemoteAddress();
|
||||||
if (clientMachine == null) { //not a web client
|
|
||||||
clientMachine = Server.getRemoteAddress();
|
|
||||||
}
|
|
||||||
if (clientMachine == null) { //not a RPC client
|
if (clientMachine == null) { //not a RPC client
|
||||||
clientMachine = "";
|
clientMachine = "";
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,10 +25,13 @@ import java.io.PrintWriter;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.security.Principal;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
import javax.servlet.ServletContext;
|
import javax.servlet.ServletContext;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
@ -58,6 +61,7 @@ import org.apache.hadoop.fs.Options;
|
||||||
import org.apache.hadoop.fs.XAttr;
|
import org.apache.hadoop.fs.XAttr;
|
||||||
import org.apache.hadoop.fs.permission.AclStatus;
|
import org.apache.hadoop.fs.permission.AclStatus;
|
||||||
import org.apache.hadoop.fs.permission.FsAction;
|
import org.apache.hadoop.fs.permission.FsAction;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.XAttrHelper;
|
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -79,8 +83,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||||
import org.apache.hadoop.hdfs.web.resources.*;
|
import org.apache.hadoop.hdfs.web.resources.*;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.ipc.ExternalCall;
|
||||||
import org.apache.hadoop.ipc.RetriableException;
|
import org.apache.hadoop.ipc.RetriableException;
|
||||||
import org.apache.hadoop.ipc.Server;
|
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.net.NodeBase;
|
import org.apache.hadoop.net.NodeBase;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
@ -102,38 +106,38 @@ public class NamenodeWebHdfsMethods {
|
||||||
|
|
||||||
private static final UriFsPathParam ROOT = new UriFsPathParam("");
|
private static final UriFsPathParam ROOT = new UriFsPathParam("");
|
||||||
|
|
||||||
private static final ThreadLocal<String> REMOTE_ADDRESS = new ThreadLocal<String>();
|
private volatile Boolean useIpcCallq;
|
||||||
|
private String scheme;
|
||||||
/** @return the remote client address. */
|
private Principal userPrincipal;
|
||||||
public static String getRemoteAddress() {
|
private String remoteAddr;
|
||||||
return REMOTE_ADDRESS.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static InetAddress getRemoteIp() {
|
|
||||||
try {
|
|
||||||
return InetAddress.getByName(getRemoteAddress());
|
|
||||||
} catch (Exception e) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns true if a WebHdfs request is in progress. Akin to
|
|
||||||
* {@link Server#isRpcInvocation()}.
|
|
||||||
*/
|
|
||||||
public static boolean isWebHdfsInvocation() {
|
|
||||||
return getRemoteAddress() != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private @Context ServletContext context;
|
private @Context ServletContext context;
|
||||||
private @Context HttpServletRequest request;
|
|
||||||
private @Context HttpServletResponse response;
|
private @Context HttpServletResponse response;
|
||||||
|
|
||||||
|
public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
|
||||||
|
// the request object is a proxy to thread-locals so we have to extract
|
||||||
|
// what we want from it since the external call will be processed in a
|
||||||
|
// different thread.
|
||||||
|
scheme = request.getScheme();
|
||||||
|
userPrincipal = request.getUserPrincipal();
|
||||||
|
// get the remote address, if coming in via a trusted proxy server then
|
||||||
|
// the address with be that of the proxied client
|
||||||
|
remoteAddr = JspHelper.getRemoteAddr(request);
|
||||||
|
}
|
||||||
|
|
||||||
private void init(final UserGroupInformation ugi,
|
private void init(final UserGroupInformation ugi,
|
||||||
final DelegationParam delegation,
|
final DelegationParam delegation,
|
||||||
final UserParam username, final DoAsParam doAsUser,
|
final UserParam username, final DoAsParam doAsUser,
|
||||||
final UriFsPathParam path, final HttpOpParam<?> op,
|
final UriFsPathParam path, final HttpOpParam<?> op,
|
||||||
final Param<?, ?>... parameters) {
|
final Param<?, ?>... parameters) {
|
||||||
|
if (useIpcCallq == null) {
|
||||||
|
Configuration conf =
|
||||||
|
(Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
|
||||||
|
useIpcCallq = conf.getBoolean(
|
||||||
|
DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ,
|
||||||
|
DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
|
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
|
||||||
+ ", ugi=" + ugi + ", " + username + ", " + doAsUser
|
+ ", ugi=" + ugi + ", " + username + ", " + doAsUser
|
||||||
|
@ -142,14 +146,6 @@ public class NamenodeWebHdfsMethods {
|
||||||
|
|
||||||
//clear content type
|
//clear content type
|
||||||
response.setContentType(null);
|
response.setContentType(null);
|
||||||
|
|
||||||
// set the remote address, if coming in via a trust proxy server then
|
|
||||||
// the address with be that of the proxied client
|
|
||||||
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
|
|
||||||
}
|
|
||||||
|
|
||||||
private void reset() {
|
|
||||||
REMOTE_ADDRESS.set(null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static NamenodeProtocols getRPCServer(NameNode namenode)
|
private static NamenodeProtocols getRPCServer(NameNode namenode)
|
||||||
|
@ -161,10 +157,62 @@ public class NamenodeWebHdfsMethods {
|
||||||
return np;
|
return np;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private <T> T doAs(final UserGroupInformation ugi,
|
||||||
|
final PrivilegedExceptionAction<T> action)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return useIpcCallq ? doAsExternalCall(ugi, action) : ugi.doAs(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> T doAsExternalCall(final UserGroupInformation ugi,
|
||||||
|
final PrivilegedExceptionAction<T> action)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
// set the remote address, if coming in via a trust proxy server then
|
||||||
|
// the address with be that of the proxied client
|
||||||
|
ExternalCall<T> call = new ExternalCall<T>(action){
|
||||||
|
@Override
|
||||||
|
public UserGroupInformation getRemoteUser() {
|
||||||
|
return ugi;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String getProtocol() {
|
||||||
|
return "webhdfs";
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public String getHostAddress() {
|
||||||
|
return remoteAddr;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public InetAddress getHostInetAddress() {
|
||||||
|
try {
|
||||||
|
return InetAddress.getByName(getHostAddress());
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final NameNode namenode = (NameNode)context.getAttribute("name.node");
|
||||||
|
namenode.queueExternalCall(call);
|
||||||
|
T result = null;
|
||||||
|
try {
|
||||||
|
result = call.get();
|
||||||
|
} catch (ExecutionException ee) {
|
||||||
|
Throwable t = ee.getCause();
|
||||||
|
if (t instanceof RuntimeException) {
|
||||||
|
throw (RuntimeException)t;
|
||||||
|
} else if (t instanceof IOException) {
|
||||||
|
throw (IOException)t;
|
||||||
|
} else {
|
||||||
|
throw new IOException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static DatanodeInfo chooseDatanode(final NameNode namenode,
|
static DatanodeInfo chooseDatanode(final NameNode namenode,
|
||||||
final String path, final HttpOpParam.Op op, final long openOffset,
|
final String path, final HttpOpParam.Op op, final long openOffset,
|
||||||
final long blocksize, final String excludeDatanodes) throws IOException {
|
final long blocksize, final String excludeDatanodes,
|
||||||
|
final String remoteAddr) throws IOException {
|
||||||
FSNamesystem fsn = namenode.getNamesystem();
|
FSNamesystem fsn = namenode.getNamesystem();
|
||||||
if (fsn == null) {
|
if (fsn == null) {
|
||||||
throw new IOException("Namesystem has not been intialized yet.");
|
throw new IOException("Namesystem has not been intialized yet.");
|
||||||
|
@ -188,7 +236,7 @@ public class NamenodeWebHdfsMethods {
|
||||||
if (op == PutOpParam.Op.CREATE) {
|
if (op == PutOpParam.Op.CREATE) {
|
||||||
//choose a datanode near to client
|
//choose a datanode near to client
|
||||||
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
|
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
|
||||||
).getDatanodeByHost(getRemoteAddress());
|
).getDatanodeByHost(remoteAddr);
|
||||||
if (clientNode != null) {
|
if (clientNode != null) {
|
||||||
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
|
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
|
||||||
path, clientNode, excludes, blocksize);
|
path, clientNode, excludes, blocksize);
|
||||||
|
@ -251,7 +299,8 @@ public class NamenodeWebHdfsMethods {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
|
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
|
||||||
Text kind = request.getScheme().equals("http") ? WebHdfsConstants.WEBHDFS_TOKEN_KIND
|
Text kind = scheme.equals("http")
|
||||||
|
? WebHdfsConstants.WEBHDFS_TOKEN_KIND
|
||||||
: WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
|
: WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
|
||||||
t.setKind(kind);
|
t.setKind(kind);
|
||||||
return t;
|
return t;
|
||||||
|
@ -265,7 +314,7 @@ public class NamenodeWebHdfsMethods {
|
||||||
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
|
||||||
final DatanodeInfo dn;
|
final DatanodeInfo dn;
|
||||||
dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
|
dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
|
||||||
excludeDatanodes);
|
excludeDatanodes, remoteAddr);
|
||||||
if (dn == null) {
|
if (dn == null) {
|
||||||
throw new IOException("Failed to find datanode, suggest to check cluster"
|
throw new IOException("Failed to find datanode, suggest to check cluster"
|
||||||
+ " health. excludeDatanodes=" + excludeDatanodes);
|
+ " health. excludeDatanodes=" + excludeDatanodes);
|
||||||
|
@ -281,7 +330,7 @@ public class NamenodeWebHdfsMethods {
|
||||||
} else {
|
} else {
|
||||||
//generate a token
|
//generate a token
|
||||||
final Token<? extends TokenIdentifier> t = generateDelegationToken(
|
final Token<? extends TokenIdentifier> t = generateDelegationToken(
|
||||||
namenode, ugi, request.getUserPrincipal().getName());
|
namenode, ugi, userPrincipal.getName());
|
||||||
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
|
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
|
||||||
}
|
}
|
||||||
final String query = op.toQueryString() + delegationQuery
|
final String query = op.toQueryString() + delegationQuery
|
||||||
|
@ -289,7 +338,6 @@ public class NamenodeWebHdfsMethods {
|
||||||
+ Param.toSortedString("&", parameters);
|
+ Param.toSortedString("&", parameters);
|
||||||
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
|
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
|
||||||
|
|
||||||
final String scheme = request.getScheme();
|
|
||||||
int port = "http".equals(scheme) ? dn.getInfoPort() : dn
|
int port = "http".equals(scheme) ? dn.getInfoPort() : dn
|
||||||
.getInfoSecurePort();
|
.getInfoSecurePort();
|
||||||
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
|
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
|
||||||
|
@ -437,10 +485,9 @@ public class NamenodeWebHdfsMethods {
|
||||||
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
|
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
|
||||||
oldSnapshotName, excludeDatanodes, createFlagParam, noredirect);
|
oldSnapshotName, excludeDatanodes, createFlagParam, noredirect);
|
||||||
|
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
|
||||||
@Override
|
@Override
|
||||||
public Response run() throws IOException, URISyntaxException {
|
public Response run() throws IOException, URISyntaxException {
|
||||||
try {
|
|
||||||
return put(ugi, delegation, username, doAsUser,
|
return put(ugi, delegation, username, doAsUser,
|
||||||
path.getAbsolutePath(), op, destination, owner, group,
|
path.getAbsolutePath(), op, destination, owner, group,
|
||||||
permission, overwrite, bufferSize, replication, blockSize,
|
permission, overwrite, bufferSize, replication, blockSize,
|
||||||
|
@ -448,9 +495,6 @@ public class NamenodeWebHdfsMethods {
|
||||||
delegationTokenArgument, aclPermission, xattrName, xattrValue,
|
delegationTokenArgument, aclPermission, xattrName, xattrValue,
|
||||||
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
|
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
|
||||||
createFlagParam, noredirect);
|
createFlagParam, noredirect);
|
||||||
} finally {
|
|
||||||
reset();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -678,16 +722,12 @@ public class NamenodeWebHdfsMethods {
|
||||||
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
|
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
|
||||||
excludeDatanodes, newLength);
|
excludeDatanodes, newLength);
|
||||||
|
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
|
||||||
@Override
|
@Override
|
||||||
public Response run() throws IOException, URISyntaxException {
|
public Response run() throws IOException, URISyntaxException {
|
||||||
try {
|
|
||||||
return post(ugi, delegation, username, doAsUser,
|
return post(ugi, delegation, username, doAsUser,
|
||||||
path.getAbsolutePath(), op, concatSrcs, bufferSize,
|
path.getAbsolutePath(), op, concatSrcs, bufferSize,
|
||||||
excludeDatanodes, newLength, noredirect);
|
excludeDatanodes, newLength, noredirect);
|
||||||
} finally {
|
|
||||||
reset();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -833,17 +873,13 @@ public class NamenodeWebHdfsMethods {
|
||||||
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
|
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
|
||||||
tokenKind, tokenService, startAfter);
|
tokenKind, tokenService, startAfter);
|
||||||
|
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
|
||||||
@Override
|
@Override
|
||||||
public Response run() throws IOException, URISyntaxException {
|
public Response run() throws IOException, URISyntaxException {
|
||||||
try {
|
|
||||||
return get(ugi, delegation, username, doAsUser,
|
return get(ugi, delegation, username, doAsUser,
|
||||||
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
|
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
|
||||||
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
|
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
|
||||||
tokenService, noredirect, startAfter);
|
tokenService, noredirect, startAfter);
|
||||||
} finally {
|
|
||||||
reset();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1113,15 +1149,11 @@ public class NamenodeWebHdfsMethods {
|
||||||
|
|
||||||
init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName);
|
init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName);
|
||||||
|
|
||||||
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
|
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
|
||||||
@Override
|
@Override
|
||||||
public Response run() throws IOException {
|
public Response run() throws IOException {
|
||||||
try {
|
|
||||||
return delete(ugi, delegation, username, doAsUser,
|
return delete(ugi, delegation, username, doAsUser,
|
||||||
path.getAbsolutePath(), op, recursive, snapshotName);
|
path.getAbsolutePath(), op, recursive, snapshotName);
|
||||||
} finally {
|
|
||||||
reset();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -4093,4 +4093,12 @@
|
||||||
<description>Instrumentation reporting long critical sections will suppress
|
<description>Instrumentation reporting long critical sections will suppress
|
||||||
consecutive warnings within this interval.</description>
|
consecutive warnings within this interval.</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.webhdfs.use.ipc.callq</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>Enables routing of webhdfs calls through rpc
|
||||||
|
call queue</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
</configuration>
|
</configuration>
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.ipc.RpcConstants;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.LightWeightCache;
|
import org.apache.hadoop.util.LightWeightCache;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -111,16 +112,30 @@ public class TestNamenodeRetryCache {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class DummyCall extends Server.Call {
|
||||||
|
private UserGroupInformation ugi;
|
||||||
|
|
||||||
|
DummyCall(int callId, byte[] clientId) {
|
||||||
|
super(callId, 1, null, null, RpcKind.RPC_PROTOCOL_BUFFER, clientId);
|
||||||
|
try {
|
||||||
|
ugi = UserGroupInformation.getCurrentUser();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public UserGroupInformation getRemoteUser() {
|
||||||
|
return ugi;
|
||||||
|
}
|
||||||
|
}
|
||||||
/** Set the current Server RPC call */
|
/** Set the current Server RPC call */
|
||||||
public static void newCall() {
|
public static void newCall() {
|
||||||
Server.Call call = new Server.Call(++callId, 1, null, null,
|
Server.Call call = new DummyCall(++callId, CLIENT_ID);
|
||||||
RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
|
|
||||||
Server.getCurCall().set(call);
|
Server.getCurCall().set(call);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void resetCall() {
|
public static void resetCall() {
|
||||||
Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null,
|
Server.Call call = new DummyCall(RpcConstants.INVALID_CALL_ID,
|
||||||
null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID);
|
RpcConstants.DUMMY_CLIENT_ID);
|
||||||
Server.getCurCall().set(call);
|
Server.getCurCall().set(call);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.web.resources;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -62,6 +63,9 @@ public class TestWebHdfsDataLocality {
|
||||||
private static final String RACK1 = "/rack1";
|
private static final String RACK1 = "/rack1";
|
||||||
private static final String RACK2 = "/rack2";
|
private static final String RACK2 = "/rack2";
|
||||||
|
|
||||||
|
private static final String LOCALHOST =
|
||||||
|
InetAddress.getLoopbackAddress().getHostName();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final ExpectedException exception = ExpectedException.none();
|
public final ExpectedException exception = ExpectedException.none();
|
||||||
|
|
||||||
|
@ -96,7 +100,8 @@ public class TestWebHdfsDataLocality {
|
||||||
|
|
||||||
//The chosen datanode must be the same as the client address
|
//The chosen datanode must be the same as the client address
|
||||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||||
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null);
|
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null,
|
||||||
|
LOCALHOST);
|
||||||
Assert.assertEquals(ipAddr, chosen.getIpAddr());
|
Assert.assertEquals(ipAddr, chosen.getIpAddr());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,19 +126,22 @@ public class TestWebHdfsDataLocality {
|
||||||
|
|
||||||
{ //test GETFILECHECKSUM
|
{ //test GETFILECHECKSUM
|
||||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||||
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null);
|
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null,
|
||||||
|
LOCALHOST);
|
||||||
Assert.assertEquals(expected, chosen);
|
Assert.assertEquals(expected, chosen);
|
||||||
}
|
}
|
||||||
|
|
||||||
{ //test OPEN
|
{ //test OPEN
|
||||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||||
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null);
|
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null,
|
||||||
|
LOCALHOST);
|
||||||
Assert.assertEquals(expected, chosen);
|
Assert.assertEquals(expected, chosen);
|
||||||
}
|
}
|
||||||
|
|
||||||
{ //test APPEND
|
{ //test APPEND
|
||||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||||
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null);
|
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null,
|
||||||
|
LOCALHOST);
|
||||||
Assert.assertEquals(expected, chosen);
|
Assert.assertEquals(expected, chosen);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -189,7 +197,7 @@ public class TestWebHdfsDataLocality {
|
||||||
{ // test GETFILECHECKSUM
|
{ // test GETFILECHECKSUM
|
||||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||||
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
|
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
|
||||||
sb.toString());
|
sb.toString(), LOCALHOST);
|
||||||
for (int j = 0; j <= i; j++) {
|
for (int j = 0; j <= i; j++) {
|
||||||
Assert.assertNotEquals(locations[j].getHostName(),
|
Assert.assertNotEquals(locations[j].getHostName(),
|
||||||
chosen.getHostName());
|
chosen.getHostName());
|
||||||
|
@ -198,7 +206,8 @@ public class TestWebHdfsDataLocality {
|
||||||
|
|
||||||
{ // test OPEN
|
{ // test OPEN
|
||||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
|
||||||
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString());
|
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(),
|
||||||
|
LOCALHOST);
|
||||||
for (int j = 0; j <= i; j++) {
|
for (int j = 0; j <= i; j++) {
|
||||||
Assert.assertNotEquals(locations[j].getHostName(),
|
Assert.assertNotEquals(locations[j].getHostName(),
|
||||||
chosen.getHostName());
|
chosen.getHostName());
|
||||||
|
@ -208,7 +217,7 @@ public class TestWebHdfsDataLocality {
|
||||||
{ // test APPEND
|
{ // test APPEND
|
||||||
final DatanodeInfo chosen = NamenodeWebHdfsMethods
|
final DatanodeInfo chosen = NamenodeWebHdfsMethods
|
||||||
.chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
|
.chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
|
||||||
blocksize, sb.toString());
|
blocksize, sb.toString(), LOCALHOST);
|
||||||
for (int j = 0; j <= i; j++) {
|
for (int j = 0; j <= i; j++) {
|
||||||
Assert.assertNotEquals(locations[j].getHostName(),
|
Assert.assertNotEquals(locations[j].getHostName(),
|
||||||
chosen.getHostName());
|
chosen.getHostName());
|
||||||
|
@ -229,6 +238,6 @@ public class TestWebHdfsDataLocality {
|
||||||
exception.expect(IOException.class);
|
exception.expect(IOException.class);
|
||||||
exception.expectMessage("Namesystem has not been intialized yet.");
|
exception.expectMessage("Namesystem has not been intialized yet.");
|
||||||
NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0,
|
NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0,
|
||||||
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null);
|
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue