Add permission checks before reading from HDFS stream (#26716)
Add checks for special permissions before reading hdfs stream data. Also adds test from readonly repository fix. MiniHDFS will now start with an existing repository with a single snapshot contained within. Readonly Repository is created in tests and attempts to list the snapshots within this repo.
This commit is contained in:
parent
fda8f8b827
commit
c760eec054
|
@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
|
@ -45,12 +46,14 @@ import java.util.Map;
|
|||
|
||||
final class HdfsBlobContainer extends AbstractBlobContainer {
|
||||
private final HdfsBlobStore store;
|
||||
private final HdfsSecurityContext securityContext;
|
||||
private final Path path;
|
||||
private final int bufferSize;
|
||||
|
||||
HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore store, Path path, int bufferSize) {
|
||||
HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore store, Path path, int bufferSize, HdfsSecurityContext hdfsSecurityContext) {
|
||||
super(blobPath);
|
||||
this.store = store;
|
||||
this.securityContext = hdfsSecurityContext;
|
||||
this.path = path;
|
||||
this.bufferSize = bufferSize;
|
||||
}
|
||||
|
@ -90,7 +93,9 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
// FSDataInputStream can open connections on read() or skip() so we wrap in
|
||||
// HDFSPrivilegedInputSteam which will ensure that underlying methods will
|
||||
// be called with the proper privileges.
|
||||
return store.execute(fileContext -> new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize)));
|
||||
return store.execute(fileContext ->
|
||||
new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize), securityContext)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -144,8 +149,11 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
*/
|
||||
private static class HDFSPrivilegedInputSteam extends FilterInputStream {
|
||||
|
||||
HDFSPrivilegedInputSteam(InputStream in) {
|
||||
private final HdfsSecurityContext securityContext;
|
||||
|
||||
HDFSPrivilegedInputSteam(InputStream in, HdfsSecurityContext hdfsSecurityContext) {
|
||||
super(in);
|
||||
this.securityContext = hdfsSecurityContext;
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
|
@ -175,9 +183,10 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
});
|
||||
}
|
||||
|
||||
private static <T> T doPrivilegedOrThrow(PrivilegedExceptionAction<T> action) throws IOException {
|
||||
private <T> T doPrivilegedOrThrow(PrivilegedExceptionAction<T> action) throws IOException {
|
||||
SpecialPermission.check();
|
||||
try {
|
||||
return AccessController.doPrivileged(action);
|
||||
return AccessController.doPrivileged(action, null, securityContext.getRestrictedExecutionPermissions());
|
||||
} catch (PrivilegedActionException e) {
|
||||
throw (IOException) e.getCause();
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ final class HdfsBlobStore implements BlobStore {
|
|||
|
||||
@Override
|
||||
public BlobContainer blobContainer(BlobPath path) {
|
||||
return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize);
|
||||
return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize, securityContext);
|
||||
}
|
||||
|
||||
private Path buildHdfsPath(BlobPath blobPath) {
|
||||
|
|
|
@ -132,7 +132,7 @@ public final class HdfsRepository extends BlobStoreRepository {
|
|||
hadoopConfiguration.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||
|
||||
// Create the filecontext with our user information
|
||||
// This will correctly configure the filecontext to have our UGI as it's internal user.
|
||||
// This will correctly configure the filecontext to have our UGI as its internal user.
|
||||
return ugi.doAs((PrivilegedAction<FileContext>) () -> {
|
||||
try {
|
||||
AbstractFileSystem fs = AbstractFileSystem.get(uri, hadoopConfiguration);
|
||||
|
|
|
@ -56,7 +56,9 @@ class HdfsSecurityContext {
|
|||
// 1) hadoop dynamic proxy is messy with access rules
|
||||
new ReflectPermission("suppressAccessChecks"),
|
||||
// 2) allow hadoop to add credentials to our Subject
|
||||
new AuthPermission("modifyPrivateCredentials")
|
||||
new AuthPermission("modifyPrivateCredentials"),
|
||||
// 3) RPC Engine requires this for re-establishing pooled connections over the lifetime of the client
|
||||
new PrivateCredentialPermission("org.apache.hadoop.security.Credentials * \"*\"", "read")
|
||||
};
|
||||
|
||||
// If Security is enabled, we need all the following elevated permissions:
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
# Integration tests for HDFS Repository plugin
|
||||
#
|
||||
# Tests retrieving information about snapshot
|
||||
#
|
||||
---
|
||||
"Get a snapshot - readonly":
|
||||
# Create repository
|
||||
- do:
|
||||
snapshot.create_repository:
|
||||
repository: test_snapshot_repository_ro
|
||||
body:
|
||||
type: hdfs
|
||||
settings:
|
||||
uri: "hdfs://localhost:9999"
|
||||
path: "/user/elasticsearch/existing/readonly-repository"
|
||||
readonly: true
|
||||
|
||||
# List snapshot info
|
||||
- do:
|
||||
snapshot.get:
|
||||
repository: test_snapshot_repository_ro
|
||||
snapshot: "_all"
|
||||
|
||||
- length: { snapshots: 1 }
|
||||
|
||||
# Remove our repository
|
||||
- do:
|
||||
snapshot.delete_repository:
|
||||
repository: test_snapshot_repository_ro
|
|
@ -0,0 +1,31 @@
|
|||
# Integration tests for HDFS Repository plugin
|
||||
#
|
||||
# Tests retrieving information about snapshot
|
||||
#
|
||||
---
|
||||
"Get a snapshot - readonly":
|
||||
# Create repository
|
||||
- do:
|
||||
snapshot.create_repository:
|
||||
repository: test_snapshot_repository_ro
|
||||
body:
|
||||
type: hdfs
|
||||
settings:
|
||||
uri: "hdfs://localhost:9998"
|
||||
path: "/user/elasticsearch/existing/readonly-repository"
|
||||
security:
|
||||
principal: "elasticsearch@BUILD.ELASTIC.CO"
|
||||
readonly: true
|
||||
|
||||
# List snapshot info
|
||||
- do:
|
||||
snapshot.get:
|
||||
repository: test_snapshot_repository_ro
|
||||
snapshot: "_all"
|
||||
|
||||
- length: { snapshots: 1 }
|
||||
|
||||
# Remove our repository
|
||||
- do:
|
||||
snapshot.delete_repository:
|
||||
repository: test_snapshot_repository_ro
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package hdfs;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
|
@ -29,9 +31,11 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.permission.AclEntry;
|
||||
import org.apache.hadoop.fs.permission.AclEntryType;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
|
@ -100,15 +104,35 @@ public class MiniHDFS {
|
|||
}
|
||||
MiniDFSCluster dfs = builder.build();
|
||||
|
||||
// Set the elasticsearch user directory up
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
FileSystem fs = dfs.getFileSystem();
|
||||
org.apache.hadoop.fs.Path esUserPath = new org.apache.hadoop.fs.Path("/user/elasticsearch");
|
||||
// Configure contents of the filesystem
|
||||
org.apache.hadoop.fs.Path esUserPath = new org.apache.hadoop.fs.Path("/user/elasticsearch");
|
||||
try (FileSystem fs = dfs.getFileSystem()) {
|
||||
|
||||
// Set the elasticsearch user directory up
|
||||
fs.mkdirs(esUserPath);
|
||||
List<AclEntry> acls = new ArrayList<>();
|
||||
acls.add(new AclEntry.Builder().setType(AclEntryType.USER).setName("elasticsearch").setPermission(FsAction.ALL).build());
|
||||
fs.modifyAclEntries(esUserPath, acls);
|
||||
fs.close();
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
List<AclEntry> acls = new ArrayList<>();
|
||||
acls.add(new AclEntry.Builder().setType(AclEntryType.USER).setName("elasticsearch").setPermission(FsAction.ALL).build());
|
||||
fs.modifyAclEntries(esUserPath, acls);
|
||||
}
|
||||
|
||||
// Install a pre-existing repository into HDFS
|
||||
String directoryName = "readonly-repository";
|
||||
String archiveName = directoryName + ".tar.gz";
|
||||
URL readOnlyRepositoryArchiveURL = MiniHDFS.class.getClassLoader().getResource(archiveName);
|
||||
if (readOnlyRepositoryArchiveURL != null) {
|
||||
Path tempDirectory = Files.createTempDirectory(MiniHDFS.class.getName());
|
||||
File readOnlyRepositoryArchive = tempDirectory.resolve(archiveName).toFile();
|
||||
FileUtils.copyURLToFile(readOnlyRepositoryArchiveURL, readOnlyRepositoryArchive);
|
||||
FileUtil.unTar(readOnlyRepositoryArchive, tempDirectory.toFile());
|
||||
|
||||
fs.copyFromLocalFile(true, true,
|
||||
new org.apache.hadoop.fs.Path(tempDirectory.resolve(directoryName).toAbsolutePath().toUri()),
|
||||
esUserPath.suffix("/existing/" + directoryName)
|
||||
);
|
||||
|
||||
FileUtils.deleteDirectory(tempDirectory.toFile());
|
||||
}
|
||||
}
|
||||
|
||||
// write our PID file
|
||||
|
|
Binary file not shown.
Loading…
Reference in New Issue