HDFS-10481. HTTPFS server should correctly impersonate as end user to open file. Contributed by Xiao Chen.

This commit is contained in:
Andrew Wang 2016-06-03 17:21:17 -07:00
parent 99a771cd7a
commit 47e0321ee9
1 changed files with 114 additions and 104 deletions

View File

@ -79,6 +79,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.text.MessageFormat;
import java.util.EnumSet;
import java.util.List;
@ -94,6 +95,7 @@ import java.util.Map;
@InterfaceAudience.Private
public class HttpFSServer {
private static Logger AUDIT_LOG = LoggerFactory.getLogger("httpfsaudit");
private static final Logger LOG = LoggerFactory.getLogger(HttpFSServer.class);
/**
* Executes a {@link FileSystemAccess.FileSystemExecutor} using a filesystem for the effective
@ -205,115 +207,123 @@ public class HttpFSServer {
MDC.put(HttpFSFileSystem.OP_PARAM, op.value().name());
MDC.put("hostname", request.getRemoteAddr());
switch (op.value()) {
case OPEN: {
//Invoking the command directly using an unmanaged FileSystem that is
// released by the FileSystemReleaseFilter
FSOperations.FSOpen command = new FSOperations.FSOpen(path);
FileSystem fs = createFileSystem(user);
InputStream is = command.execute(fs);
Long offset = params.get(OffsetParam.NAME, OffsetParam.class);
Long len = params.get(LenParam.NAME, LenParam.class);
AUDIT_LOG.info("[{}] offset [{}] len [{}]",
new Object[]{path, offset, len});
InputStreamEntity entity = new InputStreamEntity(is, offset, len);
response =
case OPEN: {
//Invoking the command directly using an unmanaged FileSystem that is
// released by the FileSystemReleaseFilter
final FSOperations.FSOpen command = new FSOperations.FSOpen(path);
final FileSystem fs = createFileSystem(user);
InputStream is = null;
UserGroupInformation ugi = UserGroupInformation
.createProxyUser(user.getShortUserName(),
UserGroupInformation.getLoginUser());
try {
is = ugi.doAs(new PrivilegedExceptionAction<InputStream>() {
@Override
public InputStream run() throws Exception {
return command.execute(fs);
}
});
} catch (InterruptedException ie) {
LOG.info("Open interrupted.", ie);
Thread.currentThread().interrupt();
}
Long offset = params.get(OffsetParam.NAME, OffsetParam.class);
Long len = params.get(LenParam.NAME, LenParam.class);
AUDIT_LOG.info("[{}] offset [{}] len [{}]",
new Object[] { path, offset, len });
InputStreamEntity entity = new InputStreamEntity(is, offset, len);
response =
Response.ok(entity).type(MediaType.APPLICATION_OCTET_STREAM).build();
break;
}
case GETFILESTATUS: {
FSOperations.FSFileStatus command =
new FSOperations.FSFileStatus(path);
Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case LISTSTATUS: {
String filter = params.get(FilterParam.NAME, FilterParam.class);
FSOperations.FSListStatus command = new FSOperations.FSListStatus(
path, filter);
Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}] filter [{}]", path,
(filter != null) ? filter : "-");
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETHOMEDIRECTORY: {
enforceRootPath(op.value(), path);
FSOperations.FSHomeDir command = new FSOperations.FSHomeDir();
JSONObject json = fsExecute(user, command);
AUDIT_LOG.info("");
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case INSTRUMENTATION: {
enforceRootPath(op.value(), path);
Groups groups = HttpFSServerWebApp.get().get(Groups.class);
List<String> userGroups = groups.getGroups(user.getShortUserName());
if (!userGroups.contains(HttpFSServerWebApp.get().getAdminGroup())) {
throw new AccessControlException(
break;
}
case GETFILESTATUS: {
FSOperations.FSFileStatus command = new FSOperations.FSFileStatus(path);
Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case LISTSTATUS: {
String filter = params.get(FilterParam.NAME, FilterParam.class);
FSOperations.FSListStatus command =
new FSOperations.FSListStatus(path, filter);
Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}] filter [{}]", path, (filter != null) ? filter : "-");
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETHOMEDIRECTORY: {
enforceRootPath(op.value(), path);
FSOperations.FSHomeDir command = new FSOperations.FSHomeDir();
JSONObject json = fsExecute(user, command);
AUDIT_LOG.info("");
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case INSTRUMENTATION: {
enforceRootPath(op.value(), path);
Groups groups = HttpFSServerWebApp.get().get(Groups.class);
List<String> userGroups = groups.getGroups(user.getShortUserName());
if (!userGroups.contains(HttpFSServerWebApp.get().getAdminGroup())) {
throw new AccessControlException(
"User not in HttpFSServer admin group");
}
Instrumentation instrumentation =
}
Instrumentation instrumentation =
HttpFSServerWebApp.get().get(Instrumentation.class);
Map snapshot = instrumentation.getSnapshot();
response = Response.ok(snapshot).build();
break;
}
case GETCONTENTSUMMARY: {
FSOperations.FSContentSummary command =
Map snapshot = instrumentation.getSnapshot();
response = Response.ok(snapshot).build();
break;
}
case GETCONTENTSUMMARY: {
FSOperations.FSContentSummary command =
new FSOperations.FSContentSummary(path);
Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETFILECHECKSUM: {
FSOperations.FSFileChecksum command =
Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETFILECHECKSUM: {
FSOperations.FSFileChecksum command =
new FSOperations.FSFileChecksum(path);
Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETFILEBLOCKLOCATIONS: {
response = Response.status(Response.Status.BAD_REQUEST).build();
break;
}
case GETACLSTATUS: {
FSOperations.FSAclStatus command =
new FSOperations.FSAclStatus(path);
Map json = fsExecute(user, command);
AUDIT_LOG.info("ACL status for [{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETXATTRS: {
List<String> xattrNames = params.getValues(XAttrNameParam.NAME,
XAttrNameParam.class);
XAttrCodec encoding = params.get(XAttrEncodingParam.NAME,
XAttrEncodingParam.class);
FSOperations.FSGetXAttrs command = new FSOperations.FSGetXAttrs(path,
xattrNames, encoding);
@SuppressWarnings("rawtypes")
Map json = fsExecute(user, command);
AUDIT_LOG.info("XAttrs for [{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case LISTXATTRS: {
FSOperations.FSListXAttrs command = new FSOperations.FSListXAttrs(path);
@SuppressWarnings("rawtypes")
Map json = fsExecute(user, command);
AUDIT_LOG.info("XAttr names for [{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]",
op.value()));
}
Map json = fsExecute(user, command);
AUDIT_LOG.info("[{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETFILEBLOCKLOCATIONS: {
response = Response.status(Response.Status.BAD_REQUEST).build();
break;
}
case GETACLSTATUS: {
FSOperations.FSAclStatus command = new FSOperations.FSAclStatus(path);
Map json = fsExecute(user, command);
AUDIT_LOG.info("ACL status for [{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case GETXATTRS: {
List<String> xattrNames =
params.getValues(XAttrNameParam.NAME, XAttrNameParam.class);
XAttrCodec encoding =
params.get(XAttrEncodingParam.NAME, XAttrEncodingParam.class);
FSOperations.FSGetXAttrs command =
new FSOperations.FSGetXAttrs(path, xattrNames, encoding);
@SuppressWarnings("rawtypes") Map json = fsExecute(user, command);
AUDIT_LOG.info("XAttrs for [{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
case LISTXATTRS: {
FSOperations.FSListXAttrs command = new FSOperations.FSListXAttrs(path);
@SuppressWarnings("rawtypes") Map json = fsExecute(user, command);
AUDIT_LOG.info("XAttr names for [{}]", path);
response = Response.ok(json).type(MediaType.APPLICATION_JSON).build();
break;
}
default: {
throw new IOException(
MessageFormat.format("Invalid HTTP GET operation [{0}]", op.value()));
}
}
return response;
}