HDFS-10789. Route webhdfs through the RPC call queue. Contributed by Daryn Sharp and Rushabh S Shah.

This commit is contained in:
Kihwal Lee 2016-10-12 15:11:42 -05:00
parent 6476934ae5
commit 85cd06f663
10 changed files with 160 additions and 98 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.ipc.Server.Call;
@ -37,14 +38,10 @@ public abstract class ExternalCall<T> extends Call {
public abstract UserGroupInformation getRemoteUser();
public final T get() throws IOException, InterruptedException {
public final T get() throws InterruptedException, ExecutionException {
waitForCompletion();
if (error != null) {
if (error instanceof IOException) {
throw (IOException)error;
} else {
throw new IOException(error);
}
throw new ExecutionException(error);
}
return result;
}

View File

@ -72,6 +72,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -989,8 +990,9 @@ public class TestRPC extends TestRpcBase {
try {
exceptionCall.get();
fail("didn't throw");
} catch (IOException ioe) {
assertEquals(expectedIOE.getMessage(), ioe.getMessage());
} catch (ExecutionException ee) {
assertTrue((ee.getCause()) instanceof IOException);
assertEquals(expectedIOE.getMessage(), ee.getCause().getMessage());
}
} finally {
server.stop();

View File

@ -70,6 +70,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.webhdfs.ugi.expire.after.access";
public static final int DFS_WEBHDFS_UGI_EXPIRE_AFTER_ACCESS_DEFAULT =
10*60*1000; //10 minutes
public static final String DFS_WEBHDFS_USE_IPC_CALLQ =
"dfs.webhdfs.use.ipc.callq";
public static final boolean DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT = true;
// HA related configuration
public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration";

View File

@ -242,7 +242,6 @@ import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.hdfs.server.namenode.top.TopConf;
import org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -338,7 +337,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private void logAuditEvent(boolean succeeded, String cmd, String src,
String dst, HdfsFileStatus stat) throws IOException {
if (isAuditEnabled() && isExternalInvocation()) {
logAuditEvent(succeeded, getRemoteUser(), getRemoteIp(),
logAuditEvent(succeeded, Server.getRemoteUser(), Server.getRemoteIp(),
cmd, src, dst, stat);
}
}
@ -5262,17 +5261,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* RPC call context even if the client exits.
*/
boolean isExternalInvocation() {
return Server.isRpcInvocation() || NamenodeWebHdfsMethods.isWebHdfsInvocation();
return Server.isRpcInvocation();
}
private static InetAddress getRemoteIp() {
InetAddress ip = Server.getRemoteIp();
if (ip != null) {
return ip;
}
return NamenodeWebHdfsMethods.getRemoteIp();
}
// optimize ugi lookup for RPC operations to avoid a trip through
// UGI.getCurrentUser which is synch'ed
private static UserGroupInformation getRemoteUser() throws IOException {
@ -6918,7 +6909,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
sb.append(trackingId);
}
sb.append("\t").append("proto=");
sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
sb.append(Server.getProtocol());
if (isCallerContextEnabled &&
callerContext != null &&
callerContext.isContextValid()) {

View File

@ -64,7 +64,9 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -407,7 +409,15 @@ public class NameNode extends ReconfigurableBase implements
public NamenodeProtocols getRpcServer() {
return rpcServer;
}
public void queueExternalCall(ExternalCall<?> extCall)
throws IOException, InterruptedException {
if (rpcServer == null) {
throw new RetriableException("Namenode is in startup mode");
}
rpcServer.getClientRpcServer().queueCall(extCall);
}
public static void initMetrics(Configuration conf, NamenodeRole role) {
metrics = NameNodeMetrics.create(conf, role);
}

View File

@ -139,7 +139,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@ -1686,10 +1685,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
}
private static String getClientMachine() {
String clientMachine = NamenodeWebHdfsMethods.getRemoteAddress();
if (clientMachine == null) { //not a web client
clientMachine = Server.getRemoteAddress();
}
String clientMachine = Server.getRemoteAddress();
if (clientMachine == null) { //not a RPC client
clientMachine = "";
}

View File

@ -25,10 +25,13 @@ import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.Principal;
import java.security.PrivilegedExceptionAction;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@ -60,6 +63,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -81,8 +85,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ExternalCall;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials;
@ -103,39 +107,39 @@ public class NamenodeWebHdfsMethods {
public static final Log LOG = LogFactory.getLog(NamenodeWebHdfsMethods.class);
private static final UriFsPathParam ROOT = new UriFsPathParam("");
private static final ThreadLocal<String> REMOTE_ADDRESS = new ThreadLocal<String>();
/** @return the remote client address. */
public static String getRemoteAddress() {
return REMOTE_ADDRESS.get();
}
public static InetAddress getRemoteIp() {
try {
return InetAddress.getByName(getRemoteAddress());
} catch (Exception e) {
return null;
}
}
/**
* Returns true if a WebHdfs request is in progress. Akin to
* {@link Server#isRpcInvocation()}.
*/
public static boolean isWebHdfsInvocation() {
return getRemoteAddress() != null;
}
private volatile Boolean useIpcCallq;
private String scheme;
private Principal userPrincipal;
private String remoteAddr;
private @Context ServletContext context;
private @Context HttpServletRequest request;
private @Context HttpServletResponse response;
public NamenodeWebHdfsMethods(@Context HttpServletRequest request) {
// the request object is a proxy to thread-locals so we have to extract
// what we want from it since the external call will be processed in a
// different thread.
scheme = request.getScheme();
userPrincipal = request.getUserPrincipal();
// get the remote address, if coming in via a trusted proxy server then
// the address with be that of the proxied client
remoteAddr = JspHelper.getRemoteAddr(request);
}
private void init(final UserGroupInformation ugi,
final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final UriFsPathParam path, final HttpOpParam<?> op,
final Param<?, ?>... parameters) {
if (useIpcCallq == null) {
Configuration conf =
(Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
useIpcCallq = conf.getBoolean(
DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ,
DFSConfigKeys.DFS_WEBHDFS_USE_IPC_CALLQ_DEFAULT);
}
if (LOG.isTraceEnabled()) {
LOG.trace("HTTP " + op.getValue().getType() + ": " + op + ", " + path
+ ", ugi=" + ugi + ", " + username + ", " + doAsUser
@ -144,16 +148,8 @@ public class NamenodeWebHdfsMethods {
//clear content type
response.setContentType(null);
// set the remote address, if coming in via a trust proxy server then
// the address with be that of the proxied client
REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
}
private void reset() {
REMOTE_ADDRESS.set(null);
}
private static NamenodeProtocols getRPCServer(NameNode namenode)
throws IOException {
final NamenodeProtocols np = namenode.getRpcServer();
@ -162,11 +158,63 @@ public class NamenodeWebHdfsMethods {
}
return np;
}
private <T> T doAs(final UserGroupInformation ugi,
final PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
return useIpcCallq ? doAsExternalCall(ugi, action) : ugi.doAs(action);
}
private <T> T doAsExternalCall(final UserGroupInformation ugi,
final PrivilegedExceptionAction<T> action)
throws IOException, InterruptedException {
// set the remote address, if coming in via a trust proxy server then
// the address with be that of the proxied client
ExternalCall<T> call = new ExternalCall<T>(action){
@Override
public UserGroupInformation getRemoteUser() {
return ugi;
}
@Override
public String getProtocol() {
return "webhdfs";
}
@Override
public String getHostAddress() {
return remoteAddr;
}
@Override
public InetAddress getHostInetAddress() {
try {
return InetAddress.getByName(getHostAddress());
} catch (UnknownHostException e) {
return null;
}
}
};
final NameNode namenode = (NameNode)context.getAttribute("name.node");
namenode.queueExternalCall(call);
T result = null;
try {
result = call.get();
} catch (ExecutionException ee) {
Throwable t = ee.getCause();
if (t instanceof RuntimeException) {
throw (RuntimeException)t;
} else if (t instanceof IOException) {
throw (IOException)t;
} else {
throw new IOException(t);
}
}
return result;
}
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize, final String excludeDatanodes) throws IOException {
final long blocksize, final String excludeDatanodes,
final String remoteAddr) throws IOException {
FSNamesystem fsn = namenode.getNamesystem();
if (fsn == null) {
throw new IOException("Namesystem has not been intialized yet.");
@ -190,7 +238,7 @@ public class NamenodeWebHdfsMethods {
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(getRemoteAddress());
).getDatanodeByHost(remoteAddr);
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.chooseTarget4WebHDFS(
path, clientNode, excludes, blocksize);
@ -253,7 +301,8 @@ public class NamenodeWebHdfsMethods {
return null;
}
final Token<? extends TokenIdentifier> t = c.getAllTokens().iterator().next();
Text kind = request.getScheme().equals("http") ? WebHdfsConstants.WEBHDFS_TOKEN_KIND
Text kind = scheme.equals("http")
? WebHdfsConstants.WEBHDFS_TOKEN_KIND
: WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
t.setKind(kind);
return t;
@ -267,7 +316,7 @@ public class NamenodeWebHdfsMethods {
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn;
dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
excludeDatanodes);
excludeDatanodes, remoteAddr);
if (dn == null) {
throw new IOException("Failed to find datanode, suggest to check cluster"
+ " health. excludeDatanodes=" + excludeDatanodes);
@ -283,7 +332,7 @@ public class NamenodeWebHdfsMethods {
} else {
//generate a token
final Token<? extends TokenIdentifier> t = generateDelegationToken(
namenode, ugi, request.getUserPrincipal().getName());
namenode, ugi, userPrincipal.getName());
delegationQuery = "&" + new DelegationParam(t.encodeToUrlString());
}
final String query = op.toQueryString() + delegationQuery
@ -291,7 +340,6 @@ public class NamenodeWebHdfsMethods {
+ Param.toSortedString("&", parameters);
final String uripath = WebHdfsFileSystem.PATH_PREFIX + path;
final String scheme = request.getScheme();
int port = "http".equals(scheme) ? dn.getInfoPort() : dn
.getInfoSecurePort();
final URI uri = new URI(scheme, null, dn.getHostName(), port, uripath,
@ -446,10 +494,9 @@ public class NamenodeWebHdfsMethods {
xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, noredirect);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
try {
return put(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, destination, owner, group,
permission, unmaskedPermission, overwrite, bufferSize,
@ -458,9 +505,6 @@ public class NamenodeWebHdfsMethods {
aclPermission, xattrName, xattrValue, xattrSetFlag,
snapshotName, oldSnapshotName, excludeDatanodes,
createFlagParam, noredirect);
} finally {
reset();
}
}
});
}
@ -703,16 +747,12 @@ public class NamenodeWebHdfsMethods {
init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
excludeDatanodes, newLength);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
try {
return post(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, concatSrcs, bufferSize,
excludeDatanodes, newLength, noredirect);
} finally {
reset();
}
}
});
}
@ -858,17 +898,13 @@ public class NamenodeWebHdfsMethods {
renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction,
tokenKind, tokenService, startAfter);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
try {
return get(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
xattrNames, xattrEncoding, excludeDatanodes, fsAction, tokenKind,
tokenService, noredirect, startAfter);
} finally {
reset();
}
}
});
}
@ -1138,15 +1174,11 @@ public class NamenodeWebHdfsMethods {
init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
return doAs(ugi, new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException {
try {
return delete(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, recursive, snapshotName);
} finally {
reset();
}
}
});
}

View File

@ -4281,4 +4281,11 @@
</description>
</property>
<property>
<name>dfs.webhdfs.use.ipc.callq</name>
<value>true</value>
<description>Enables routing of webhdfs calls through rpc
call queue</description>
</property>
</configuration>

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.LightWeightCache;
import org.junit.After;
@ -111,19 +112,33 @@ public class TestNamenodeRetryCache {
}
}
static class DummyCall extends Server.Call {
private UserGroupInformation ugi;
DummyCall(int callId, byte[] clientId) {
super(callId, 1, null, null, RpcKind.RPC_PROTOCOL_BUFFER, clientId);
try {
ugi = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
}
}
@Override
public UserGroupInformation getRemoteUser() {
return ugi;
}
}
/** Set the current Server RPC call */
public static void newCall() {
Server.Call call = new Server.Call(++callId, 1, null, null,
RpcKind.RPC_PROTOCOL_BUFFER, CLIENT_ID);
Server.Call call = new DummyCall(++callId, CLIENT_ID);
Server.getCurCall().set(call);
}
public static void resetCall() {
Server.Call call = new Server.Call(RpcConstants.INVALID_CALL_ID, 1, null,
null, RpcKind.RPC_PROTOCOL_BUFFER, RpcConstants.DUMMY_CLIENT_ID);
Server.Call call = new DummyCall(RpcConstants.INVALID_CALL_ID,
RpcConstants.DUMMY_CLIENT_ID);
Server.getCurCall().set(call);
}
private void concatSetup(String file1, String file2) throws Exception {
DFSTestUtil.createFile(filesystem, new Path(file1), BlockSize, (short)1, 0L);
DFSTestUtil.createFile(filesystem, new Path(file2), BlockSize, (short)1, 0L);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.web.resources;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
@ -62,6 +63,9 @@ public class TestWebHdfsDataLocality {
private static final String RACK1 = "/rack1";
private static final String RACK2 = "/rack2";
private static final String LOCALHOST =
InetAddress.getLoopbackAddress().getHostName();
@Rule
public final ExpectedException exception = ExpectedException.none();
@ -96,7 +100,8 @@ public class TestWebHdfsDataLocality {
//The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null);
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, null,
LOCALHOST);
Assert.assertEquals(ipAddr, chosen.getIpAddr());
}
}
@ -121,19 +126,22 @@ public class TestWebHdfsDataLocality {
{ //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null);
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, null,
LOCALHOST);
Assert.assertEquals(expected, chosen);
}
{ //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null);
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, null,
LOCALHOST);
Assert.assertEquals(expected, chosen);
}
{ //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null);
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, null,
LOCALHOST);
Assert.assertEquals(expected, chosen);
}
} finally {
@ -189,7 +197,7 @@ public class TestWebHdfsDataLocality {
{ // test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize,
sb.toString());
sb.toString(), LOCALHOST);
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
@ -198,7 +206,8 @@ public class TestWebHdfsDataLocality {
{ // test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString());
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, sb.toString(),
LOCALHOST);
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
@ -208,7 +217,7 @@ public class TestWebHdfsDataLocality {
{ // test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods
.chooseDatanode(namenode, f, PostOpParam.Op.APPEND, -1L,
blocksize, sb.toString());
blocksize, sb.toString(), LOCALHOST);
for (int j = 0; j <= i; j++) {
Assert.assertNotEquals(locations[j].getHostName(),
chosen.getHostName());
@ -229,6 +238,6 @@ public class TestWebHdfsDataLocality {
exception.expect(IOException.class);
exception.expectMessage("Namesystem has not been intialized yet.");
NamenodeWebHdfsMethods.chooseDatanode(nn, "/path", PutOpParam.Op.CREATE, 0,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null);
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT, null, LOCALHOST);
}
}