HDFS-10789. Route webhdfs through the RPC call queue. Contributed by Daryn Sharp and Rushabh S Shah.

(cherry picked from commit 5305a392c39d298ecf38ca2dfd2526adeee9cd38)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
This commit is contained in:
Kihwal Lee 2016-10-12 15:38:34 -05:00
parent 0f481e2022
commit 136c6f6f7d
10 changed files with 161 additions and 98 deletions

View File

@ -20,6 +20,7 @@
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Call;
@ -37,14 +38,10 @@ public ExternalCall(PrivilegedExceptionAction<T> action) {
public abstract UserGroupInformation getRemoteUser(); public abstract UserGroupInformation getRemoteUser();
public final T get() throws IOException, InterruptedException { public final T get() throws InterruptedException, ExecutionException {
waitForCompletion(); waitForCompletion();
if (error != null) { if (error != null) {
if (error instanceof IOException) { throw new ExecutionException(error);
throw (IOException)error;
} else {
throw new IOException(error);
}
} }
return result; return result;
} }

View File

@ -72,6 +72,7 @@
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -1001,8 +1002,9 @@ public Void run() throws Exception {
try { try {
exceptionCall.get(); exceptionCall.get();
fail("didn't throw"); fail("didn't throw");
} catch (IOException ioe) { } catch (ExecutionException ee) {
assertEquals(expectedIOE.getMessage(), ioe.getMessage()); assertTrue((ee.getCause()) instanceof IOException);
assertEquals(expectedIOE.getMessage(), ee.getCause().getMessage());
} }
} finally { } finally {
server.stop(); server.stop();

View File

@ -73,6 +73,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.webhdfs.ugi.expire.after.access"; "dfs.webhdfs.ugi.expire.after.access";
public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT = public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
10*60*1000; //10 minutes 10*60*1000; //10 minutes
public static final String DFS_WEBHDFS_USE_IPC_CALLQ =
"dfs.webhdfs.use.ipc.callq";
public static final boolean DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT = true;
// HA related configuration // HA related configuration
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration"; public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";

View File

@ -255,7 +255,6 @@
import org.apache.hadoop.hdfs.server.namenode.top.TopConf; import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics; import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager; import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
@ -338,7 +337,7 @@ private void logAuditEvent(boolean succeeded, String cmd, String src)
private void logAuditEvent(boolean succeeded, String cmd, String src, private void logAuditEvent(boolean succeeded, String cmd, String src,
String dst, HdfsFileStatus stat) throws IOException { String dst, HdfsFileStatus stat) throws IOException {
if (isAuditEnabled() && isExternalInvocation()) { if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(), logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
cmd, src, dst, stat); cmd, src, dst, stat);
} }
} }
@ -5939,15 +5938,7 @@ private AuthenticationMethod getConnectionAuthenticationMethod()
* RPC call context even if the client exits. * RPC call context even if the client exits.
*/ */
boolean isExternalInvocation() { boolean isExternalInvocation() {
return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation(); return Server.isRpcInvocation();
}
private static InetAddress getRemoteIp() {
InetAddress ip = Server.getRemoteIp();
if (ip != null) {
return ip;
}
return NamenodeWebHdfsMethods.getRemoteIp();
} }
// optimize ugi lookup for RPC operations to avoid a trip through // optimize ugi lookup for RPC operations to avoid a trip through
@ -7478,7 +7469,7 @@ public void logAuditEvent(boolean succeeded, String userName,
sb.append(trackingId); sb.append(trackingId);
} }
sb.append("\t").append("proto="); sb.append("\t").append("proto=");
sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc"); sb.append(Server.getProtocol());
if (isCallerContextEnabled && if (isCallerContextEnabled &&
callerContext != null && callerContext != null &&
callerContext.isContextValid()) { callerContext.isContextValid()) {

View File

@ -59,7 +59,9 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol; import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -383,6 +385,14 @@ public NamenodeProtocols getRpcServer() {
return rpcServer; return rpcServer;
} }
public void queueExternalCall(ExternalCall<?> extCall)
throws IOException, InterruptedException {
if (rpcServer == null) {
throw new RetriableException("Namenode is in startup mode");
}
rpcServer.getClientRpcServer().queueCall(extCall);
}
static void initMetrics(Configuration conf, NamenodeRole role) { static void initMetrics(Configuration conf, NamenodeRole role) {
metrics = NameNodeMetrics.create(conf, role); metrics = NameNodeMetrics.create(conf, role);
} }

View File

@ -134,7 +134,6 @@
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -1670,10 +1669,7 @@ private void verifySoftwareVersion(DatanodeRegistration dnReg)
} }
private static String getClientMachine() { private static String getClientMachine() {
String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress(); String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a web client
clientMachine = Server.getRemoteAddress();
}
if (clientMachine == null) { //not a RPC client if (clientMachine == null) { //not a RPC client
clientMachine = ""; clientMachine = "";
} }

View File

@ -25,10 +25,13 @@
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.Principal;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.servlet.ServletContext; import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -58,6 +61,7 @@
import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -79,8 +83,8 @@
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.*; import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
@ -102,38 +106,38 @@ public class NamenodeWebHdfsMethods {
private static final UriFsPathParam ROOT = new UriFsPathParam(""); private static final UriFsPathParam ROOT = new UriFsPathParam("");
private static final ThreadLocal<String> REMOTE_ADDRESS = new ThreadLocal<String>(); private volatile Boolean useIpcCallq;
private String scheme;
/** @return the remote client address. */ private Principal userPrincipal;
public static String getRemoteAddress() { private String remoteAddr;
return REMOTE_ADDRESS.get();
}
public static InetAddress getRemoteIp() {
try {
return InetAddress.getByName(getRemoteAddress());
} catch (Exception e) {
return null;
}
}
/**
* Returns true if a WebHdfs request is in progress. Akin to
* {@link Server#isRpcInvocation()}.
*/
public static boolean isWebHdfsInvocation() {
return getRemoteAddress() != null;
}
private @Context ServletContext context; private @Context ServletContext context;
private @Context HttpServletRequest request;
private @Context HttpServletResponse response; private @Context HttpServletResponse response;
public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
// the request object is a proxy to thread-locals so we have to extract
// what we want from it since the external call will be processed in a
// different thread.
scheme = request.getScheme();
userPrincipal = request.getUserPrincipal();
// get the remote address, if coming in via a trusted proxy server then
// the address with be that of the proxied client
remoteAddr = JspHelper.getRemoteAddr(request);
}
private void init(final UserGroupInformation ugi, private void init(final UserGroupInformation ugi,
final DelegationParam delegation, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser, final UserParam username, final DoAsParam doAsUser,
final UriFsPathParam path, final HttpOpParam<?> op, final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) { final Param<?, ?>... parameters) {
if (useIpcCallq == null) {
Configuration conf =
(Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
useIpcCallq = conf.getBoolean(
DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ,
DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT);
}
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+ ", ugi=" + ugi + ", " + username + ", " + doAsUser + ", ugi=" + ugi + ", " + username + ", " + doAsUser
@ -142,14 +146,6 @@ private void init(final UserGroupInformation ugi,
//clear content type //clear content type
response.setContentType(null); response.setContentType(null);
// set the remote address, if coming in via a trust proxy server then
// the address with be that of the proxied client
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
}
private void reset() {
REMOTE_ADDRESS.set(null);
} }
private static NamenodeProtocols getRPCServer(NameNode namenode) private static NamenodeProtocols getRPCServer(NameNode namenode)
@ -161,10 +157,62 @@ private static NamenodeProtocols getRPCServer(NameNode namenode)
return np; return np;
} }
private <T> T doAs(final UserGroupInformation ugi,
final PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
return useIpcCallq ? doAsExternalCall(ugi, action) : ugi.doAs(action);
}
private <T> T doAsExternalCall(final UserGroupInformation ugi,
final PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
// set the remote address, if coming in via a trust proxy server then
// the address with be that of the proxied client
ExternalCall<T> call = new ExternalCall<T>(action){
@Override
public UserGroupInformation getRemoteUser() {
return ugi;
}
@Override
public String getProtocol() {
return "webhdfs";
}
@Override
public String getHostAddress() {
return remoteAddr;
}
@Override
public InetAddress getHostInetAddress() {
try {
return InetAddress.getByName(getHostAddress());
} catch (UnknownHostException e) {
return null;
}
}
};
final NameNode namenode = (NameNode)context.getAttribute("name.node");
namenode.queueExternalCall(call);
T result = null;
try {
result = call.get();
} catch (ExecutionException ee) {
Throwable t = ee.getCause();
if (t instanceof RuntimeException) {
throw (RuntimeException)t;
} else if (t instanceof IOException) {
throw (IOException)t;
} else {
throw new IOException(t);
}
}
return result;
}
@VisibleForTesting @VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode, static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset, final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, final String excludeDatanodes) throws IOException { final long blocksize, final String excludeDatanodes,
final String remoteAddr) throws IOException {
FSNamesystem fsn = namenode.getNamesystem(); FSNamesystem fsn = namenode.getNamesystem();
if (fsn == null) { if (fsn == null) {
throw new IOException("Namesystem has not been intialized yet."); throw new IOException("Namesystem has not been intialized yet.");
@ -188,7 +236,7 @@ static DatanodeInfo chooseDatanode(final NameNode namenode,
if (op == PutOpParam.Op.CREATE) { if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client //choose a datanode near to client
final DatanodeDescriptor clientNode = bm.getDatanodeManager( final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(getRemoteAddress()); ).getDatanodeByHost(remoteAddr);
if (clientNode != null) { if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS( final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
path, clientNode, excludes, blocksize); path, clientNode, excludes, blocksize);
@ -251,7 +299,8 @@ private Token<? extends TokenIdentifier> generateDelegationToken(
return null; return null;
} }
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next(); final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
Text kind = request.getScheme().equals("http") ? WebHdfsConstants.WEBHDFS_TOKEN_KIND Text kind = scheme.equals("http")
? WebHdfsConstants.WEBHDFS_TOKEN_KIND
: WebHdfsConstants.SWEBHDFS_TOKEN_KIND; : WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
t.setKind(kind); t.setKind(kind);
return t; return t;
@ -265,7 +314,7 @@ private URI redirectURI(final NameNode namenode,
final Param<?, ?>... parameters) throws URISyntaxException, IOException { final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn; final DatanodeInfo dn;
dn = chooseDatanode(namenode, path, op, openOffset, blocksize, dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
excludeDatanodes); excludeDatanodes, remoteAddr);
if (dn == null) { if (dn == null) {
throw new IOException("Failed to find datanode, suggest to check cluster" throw new IOException("Failed to find datanode, suggest to check cluster"
+ " health. excludeDatanodes=" + excludeDatanodes); + " health. excludeDatanodes=" + excludeDatanodes);
@ -281,7 +330,7 @@ private URI redirectURI(final NameNode namenode,
} else { } else {
//generate a token //generate a token
final Token<? extends TokenIdentifier> t = generateDelegationToken( final Token<? extends TokenIdentifier> t = generateDelegationToken(
namenode, ugi, request.getUserPrincipal().getName()); namenode, ugi, userPrincipal.getName());
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString()); delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
} }
final String query = op.toQueryString() + delegationQuery final String query = op.toQueryString() + delegationQuery
@ -289,7 +338,6 @@ private URI redirectURI(final NameNode namenode,
+ Param.toSortedString("&", parameters); + Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path; final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
final String scheme = request.getScheme();
int port = "http".equals(scheme) ? dn.getInfoPort() : dn int port = "http".equals(scheme) ? dn.getInfoPort() : dn
.getInfoSecurePort(); .getInfoSecurePort();
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath, final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
@ -432,10 +480,9 @@ public Response put(
aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName, aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
oldSnapshotName, excludeDatanodes, createFlagParam); oldSnapshotName, excludeDatanodes, createFlagParam);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override @Override
public Response run() throws IOException, URISyntaxException { public Response run() throws IOException, URISyntaxException {
try {
return put(ugi, delegation, username, doAsUser, return put(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, destination, owner, group, path.getAbsolutePath(), op, destination, owner, group,
permission, overwrite, bufferSize, replication, blockSize, permission, overwrite, bufferSize, replication, blockSize,
@ -443,9 +490,6 @@ public Response run() throws IOException, URISyntaxException {
delegationTokenArgument, aclPermission, xattrName, xattrValue, delegationTokenArgument, aclPermission, xattrName, xattrValue,
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes, xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam); createFlagParam);
} finally {
reset();
}
} }
}); });
} }
@ -662,16 +706,12 @@ public Response post(
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize, init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
excludeDatanodes, newLength); excludeDatanodes, newLength);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override @Override
public Response run() throws IOException, URISyntaxException { public Response run() throws IOException, URISyntaxException {
try {
return post(ugi, delegation, username, doAsUser, return post(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, concatSrcs, bufferSize, path.getAbsolutePath(), op, concatSrcs, bufferSize,
excludeDatanodes, newLength); excludeDatanodes, newLength);
} finally {
reset();
}
} }
}); });
} }
@ -802,17 +842,13 @@ public Response get(
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction, renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
tokenKind, tokenService); tokenKind, tokenService);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override @Override
public Response run() throws IOException, URISyntaxException { public Response run() throws IOException, URISyntaxException {
try {
return get(ugi, delegation, username, doAsUser, return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize, path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind, xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
tokenService); tokenService);
} finally {
reset();
}
} }
}); });
} }
@ -1058,15 +1094,11 @@ public Response delete(
init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName); init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName);
return ugi.doAs(new PrivilegedExceptionAction<Response>() { return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override @Override
public Response run() throws IOException { public Response run() throws IOException {
try {
return delete(ugi, delegation, username, doAsUser, return delete(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, recursive, snapshotName); path.getAbsolutePath(), op, recursive, snapshotName);
} finally {
reset();
}
} }
}); });
} }

View File

@ -3045,4 +3045,12 @@
<description>Instrumentation reporting long critical sections will suppress <description>Instrumentation reporting long critical sections will suppress
consecutive warnings within this interval.</description> consecutive warnings within this interval.</description>
</property> </property>
<property>
<name>dfs.webhdfs.use.ipc.callq</name>
<value>true</value>
<description>Enables routing of webhdfs calls through rpc
call queue</description>
</property>
</configuration> </configuration>

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.LightWeightCache; import org.apache.hadoop.util.LightWeightCache;
import org.junit.After; import org.junit.After;
@ -111,16 +112,30 @@ public void cleanup() throws IOException {
} }
} }
static class DummyCall extends Server.Call {
private UserGroupInformation ugi;
DummyCall(int callId, byte[] clientId) {
super(callId, 1, null, null, RpcKind.RPC_PROTOCOL_BUFFER, clientId);
try {
ugi = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
}
}
@Override
public UserGroupInformation getRemoteUser() {
return ugi;
}
}
/** Set the current Server RPC call */ /** Set the current Server RPC call */
public static void newCall() { public static void newCall() {
Server.Call call = new Server.Call(++callId, 1, null, null, Server.Call call = new DummyCall(++callId, CLIENT_ID);
RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
Server.getCurCall().set(call); Server.getCurCall().set(call);
} }
public static void resetCall() { public static void resetCall() {
Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null, Server.Call call = new DummyCall(RpcConstants.INVALID_CALL_ID,
null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID); RpcConstants.DUMMY_CLIENT_ID);
Server.getCurCall().set(call); Server.getCurCall().set(call);
} }

View File

@ -20,6 +20,7 @@
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -62,6 +63,9 @@ public class TestWebHdfsDataLocality {
private static final String RACK1 = "/rack1"; private static final String RACK1 = "/rack1";
private static final String RACK2 = "/rack2"; private static final String RACK2 = "/rack2";
private static final String LOCALHOST =
InetAddress.getLoopbackAddress().getHostName();
@Rule @Rule
public final ExpectedException exception = ExpectedException.none(); public final ExpectedException exception = ExpectedException.none();
@ -96,7 +100,8 @@ public void testDataLocality() throws Exception {
//The chosen datanode must be the same as the client address //The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null); namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null,
LOCALHOST);
Assert.assertEquals(ipAddr, chosen.getIpAddr()); Assert.assertEquals(ipAddr, chosen.getIpAddr());
} }
} }
@ -121,19 +126,22 @@ public void testDataLocality() throws Exception {
{ //test GETFILECHECKSUM { //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null); namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null,
LOCALHOST);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
{ //test OPEN { //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null); namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null,
LOCALHOST);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
{ //test APPEND { //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null); namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null,
LOCALHOST);
Assert.assertEquals(expected, chosen); Assert.assertEquals(expected, chosen);
} }
} finally { } finally {
@ -189,7 +197,7 @@ public void testExcludeDataNodes() throws Exception {
{ // test GETFILECHECKSUM { // test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
sb.toString()); sb.toString(), LOCALHOST);
for (int j = 0; j <= i; j++) { for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(), Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName()); chosen.getHostName());
@ -198,7 +206,8 @@ public void testExcludeDataNodes() throws Exception {
{ // test OPEN { // test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode( final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString()); namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(),
LOCALHOST);
for (int j = 0; j <= i; j++) { for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(), Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName()); chosen.getHostName());
@ -208,7 +217,7 @@ public void testExcludeDataNodes() throws Exception {
{ // test APPEND { // test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods final DatanodeInfo chosen = NamenodeWebHdfsMethods
.chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L, .chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
blocksize, sb.toString()); blocksize, sb.toString(), LOCALHOST);
for (int j = 0; j <= i; j++) { for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(), Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName()); chosen.getHostName());
@ -229,6 +238,6 @@ public void testChooseDatanodeBeforeNamesystemInit() throws Exception {
exception.expect(IOException.class); exception.expect(IOException.class);
exception.expectMessage("Namesystem has not been intialized yet."); exception.expectMessage("Namesystem has not been intialized yet.");
NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0, NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null); DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST);
} }
} }