HDFS-10481. HTTPFS server should correctly impersonate as end user to open file. Contributed by Xiao Chen.

(cherry picked from commit 47e0321ee9)
This commit is contained in:
Andrew Wang 2016-06-03 17:21:17 -07:00
parent 03c4724c88
commit d5609e3499
1 changed files with 114 additions and 104 deletions

View File

@ -79,6 +79,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.URI; import java.net.URI;
import java.security.AccessControlException; import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
@ -94,6 +95,7 @@ import java.util.Map;
@InterfaceAudience.Private @InterfaceAudience.Private
public class HttpFSServer { public class HttpFSServer {
private static Logger AUDIT_LOG = LoggerFactory.getLogger("httpfsaudit"); private static Logger AUDIT_LOG = LoggerFactory.getLogger("httpfsaudit");
private static final Logger LOG = LoggerFactory.getLogger(HttpFSServer.class);
/** /**
* Executes a {@link FileSystemAccess.FileSystemExecutor} using a filesystem for the effective * Executes a {@link FileSystemAccess.FileSystemExecutor} using a filesystem for the effective
@ -205,115 +207,123 @@ public class HttpFSServer {
MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name()); MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name());
MDC.put("hostname", request.getRemoteAddr()); MDC.put("hostname", request.getRemoteAddr());
switch (op.value()) { switch (op.value()) {
case OPEN: { case OPEN: {
//Invoking the command directly using an unmanaged FileSystem that is //Invoking the command directly using an unmanaged FileSystem that is
// released by the FileSystemReleaseFilter // released by the FileSystemReleaseFilter
FSOperations.FSOpen command = new FSOperations.FSOpen(path); final FSOperations.FSOpen command = new FSOperations.FSOpen(path);
FileSystem fs = createFileSystem(user); final FileSystem fs = createFileSystem(user);
InputStream is = command.execute(fs); InputStream is = null;
Long offset = params.get(OffsetParam.NAME, OffsetParam.class); UserGroupInformation ugi = UserGroupInformation
Long len = params.get(LenParam.NAME, LenParam.class); .createProxyUser(user.getShortUserName(),
AUDIT_LOG.info("[{}] offset [{}] len [{}]", UserGroupInformation.getLoginUser());
new Object[]{path, offset, len}); try {
InputStreamEntity entity = new InputStreamEntity(is, offset, len); is = ugi.doAs(new PrivilegedExceptionAction<InputStream>() {
response = @Override
public InputStream run() throws Exception {
return command.execute(fs);
}
});
} catch (InterruptedException ie) {
LOG.info("Open interrupted.", ie);
Thread.currentThread().interrupt();
}
Long offset = params.get(OffsetParam.NAME, OffsetParam.class);
Long len = params.get(LenParam.NAME, LenParam.class);
AUDIT_LOG.info("[{}] offset [{}] len [{}]",
new Object[] { path, offset, len });
InputStreamEntity entity = new InputStreamEntity(is, offset, len);
response =
Response.ok(entity).type(MediaType.APPLICATION_OCTET_STREAM).build(); Response.ok(entity).type(MediaType.APPLICATION_OCTET_STREAM).build();
break; break;
} }
case GETFILESTATUS: { case GETFILESTATUS: {
FSOperations.FSFileStatus command = FSOperations.FSFileStatus command = new FSOperations.FSFileStatus(path);
new FSOperations.FSFileStatus(path); Map json = fsExecute(user, command);
Map json = fsExecute(user, command); AUDIT_LOG.info("[{}]", path);
AUDIT_LOG.info("[{}]", path); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break;
break; }
} case LISTSTATUS: {
case LISTSTATUS: { String filter = params.get(FilterParam.NAME, FilterParam.class);
String filter = params.get(FilterParam.NAME, FilterParam.class); FSOperations.FSListStatus command =
FSOperations.FSListStatus command = new FSOperations.FSListStatus( new FSOperations.FSListStatus(path, filter);
path, filter); Map json = fsExecute(user, command);
Map json = fsExecute(user, command); AUDIT_LOG.info("[{}] filter [{}]", path, (filter != null) ? filter : "-");
AUDIT_LOG.info("[{}] filter [{}]", path, response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
(filter != null) ? filter : "-"); break;
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); }
break; case GETHOMEDIRECTORY: {
} enforceRootPath(op.value(), path);
case GETHOMEDIRECTORY: { FSOperations.FSHomeDir command = new FSOperations.FSHomeDir();
enforceRootPath(op.value(), path); JSONObject json = fsExecute(user, command);
FSOperations.FSHomeDir command = new FSOperations.FSHomeDir(); AUDIT_LOG.info("");
JSONObject json = fsExecute(user, command); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
AUDIT_LOG.info(""); break;
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); }
break; case INSTRUMENTATION: {
} enforceRootPath(op.value(), path);
case INSTRUMENTATION: { Groups groups = HttpFSServerWebApp.get().get(Groups.class);
enforceRootPath(op.value(), path); List<String> userGroups = groups.getGroups(user.getShortUserName());
Groups groups = HttpFSServerWebApp.get().get(Groups.class); if (!userGroups.contains(HttpFSServerWebApp.get().getAdminGroup())) {
List<String> userGroups = groups.getGroups(user.getShortUserName()); throw new AccessControlException(
if (!userGroups.contains(HttpFSServerWebApp.get().getAdminGroup())) {
throw new AccessControlException(
"User not in HttpFSServer admin group"); "User not in HttpFSServer admin group");
} }
Instrumentation instrumentation = Instrumentation instrumentation =
HttpFSServerWebApp.get().get(Instrumentation.class); HttpFSServerWebApp.get().get(Instrumentation.class);
Map snapshot = instrumentation.getSnapshot(); Map snapshot = instrumentation.getSnapshot();
response = Response.ok(snapshot).build(); response = Response.ok(snapshot).build();
break; break;
} }
case GETCONTENTSUMMARY: { case GETCONTENTSUMMARY: {
FSOperations.FSContentSummary command = FSOperations.FSContentSummary command =
new FSOperations.FSContentSummary(path); new FSOperations.FSContentSummary(path);
Map json = fsExecute(user, command); Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path); AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break; break;
} }
case GETFILECHECKSUM: { case GETFILECHECKSUM: {
FSOperations.FSFileChecksum command = FSOperations.FSFileChecksum command =
new FSOperations.FSFileChecksum(path); new FSOperations.FSFileChecksum(path);
Map json = fsExecute(user, command); Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path); AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break; break;
} }
case GETFILEBLOCKLOCATIONS: { case GETFILEBLOCKLOCATIONS: {
response = Response.status(Response.Status.BAD_REQUEST).build(); response = Response.status(Response.Status.BAD_REQUEST).build();
break; break;
} }
case GETACLSTATUS: { case GETACLSTATUS: {
FSOperations.FSAclStatus command = FSOperations.FSAclStatus command = new FSOperations.FSAclStatus(path);
new FSOperations.FSAclStatus(path); Map json = fsExecute(user, command);
Map json = fsExecute(user, command); AUDIT_LOG.info("ACL status for [{}]", path);
AUDIT_LOG.info("ACL status for [{}]", path); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); break;
break; }
} case GETXATTRS: {
case GETXATTRS: { List<String> xattrNames =
List<String> xattrNames = params.getValues(XAttrNameParam.NAME, params.getValues(XAttrNameParam.NAME, XAttrNameParam.class);
XAttrNameParam.class); XAttrCodec encoding =
XAttrCodec encoding = params.get(XAttrEncodingParam.NAME, params.get(XAttrEncodingParam.NAME, XAttrEncodingParam.class);
XAttrEncodingParam.class); FSOperations.FSGetXAttrs command =
FSOperations.FSGetXAttrs command = new FSOperations.FSGetXAttrs(path, new FSOperations.FSGetXAttrs(path, xattrNames, encoding);
xattrNames, encoding); @SuppressWarnings("rawtypes") Map json = fsExecute(user, command);
@SuppressWarnings("rawtypes") AUDIT_LOG.info("XAttrs for [{}]", path);
Map json = fsExecute(user, command); response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
AUDIT_LOG.info("XAttrs for [{}]", path); break;
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); }
break; case LISTXATTRS: {
} FSOperations.FSListXAttrs command = new FSOperations.FSListXAttrs(path);
case LISTXATTRS: { @SuppressWarnings("rawtypes") Map json = fsExecute(user, command);
FSOperations.FSListXAttrs command = new FSOperations.FSListXAttrs(path); AUDIT_LOG.info("XAttr names for [{}]", path);
@SuppressWarnings("rawtypes") response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
Map json = fsExecute(user, command); break;
AUDIT_LOG.info("XAttr names for [{}]", path); }
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build(); default: {
break; throw new IOException(
} MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
default: { }
throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]",
op.value()));
}
} }
return response; return response;
} }