Add doPrivilege blocks for socket connect ops in repository-hdfs (#22793)
This is related to #22116. The repository-hdfs plugin opens socket connections. As SocketPermission is transitioned out of core, hdfs will require connect permission. This pull request wraps operations that require this permission in doPrivileged blocks.
This commit is contained in:
parent
aad51d44ab
commit
eb4562d7a5
|
@ -19,12 +19,12 @@
|
|||
package org.elasticsearch.repositories.hdfs;
|
||||
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Options.CreateOpts;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
|
@ -32,10 +32,15 @@ import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
|||
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.LinkedHashMap;
|
||||
|
@ -56,12 +61,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
@Override
|
||||
public boolean blobExists(String blobName) {
|
||||
try {
|
||||
return store.execute(new Operation<Boolean>() {
|
||||
@Override
|
||||
public Boolean run(FileContext fileContext) throws IOException {
|
||||
return fileContext.util().exists(new Path(path, blobName));
|
||||
}
|
||||
});
|
||||
return store.execute(fileContext -> fileContext.util().exists(new Path(path, blobName)));
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
|
@ -73,22 +73,14 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
|
||||
}
|
||||
|
||||
store.execute(new Operation<Boolean>() {
|
||||
@Override
|
||||
public Boolean run(FileContext fileContext) throws IOException {
|
||||
return fileContext.delete(new Path(path, blobName), true);
|
||||
}
|
||||
});
|
||||
store.execute(fileContext -> fileContext.delete(new Path(path, blobName), true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void move(String sourceBlobName, String targetBlobName) throws IOException {
|
||||
store.execute(new Operation<Void>() {
|
||||
@Override
|
||||
public Void run(FileContext fileContext) throws IOException {
|
||||
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
|
||||
return null;
|
||||
}
|
||||
store.execute((Operation<Void>) fileContext -> {
|
||||
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -98,12 +90,10 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
|
||||
}
|
||||
// FSDataInputStream does buffering internally
|
||||
return store.execute(new Operation<InputStream>() {
|
||||
@Override
|
||||
public InputStream run(FileContext fileContext) throws IOException {
|
||||
return fileContext.open(new Path(path, blobName), bufferSize);
|
||||
}
|
||||
});
|
||||
// FSDataInputStream can open connections on read() or skip() so we wrap in
|
||||
// HDFSPrivilegedInputSteam which will ensure that underlying methods will
|
||||
// be called with the proper privileges.
|
||||
return store.execute(fileContext -> new HDFSPrivilegedInputSteam(fileContext.open(new Path(path, blobName), bufferSize)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -111,45 +101,33 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
if (blobExists(blobName)) {
|
||||
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
|
||||
}
|
||||
store.execute(new Operation<Void>() {
|
||||
@Override
|
||||
public Void run(FileContext fileContext) throws IOException {
|
||||
Path blob = new Path(path, blobName);
|
||||
// we pass CREATE, which means it fails if a blob already exists.
|
||||
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
|
||||
// that should be fixed there, no need to bring truncation into this, give the user an error.
|
||||
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
|
||||
CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) };
|
||||
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
|
||||
int bytesRead;
|
||||
byte[] buffer = new byte[bufferSize];
|
||||
while ((bytesRead = inputStream.read(buffer)) != -1) {
|
||||
stream.write(buffer, 0, bytesRead);
|
||||
// For safety we also hsync each write as well, because of its docs:
|
||||
// SYNC_BLOCK - to force closed blocks to the disk device
|
||||
// "In addition Syncable.hsync() should be called after each write,
|
||||
// if true synchronous behavior is required"
|
||||
stream.hsync();
|
||||
}
|
||||
store.execute((Operation<Void>) fileContext -> {
|
||||
Path blob = new Path(path, blobName);
|
||||
// we pass CREATE, which means it fails if a blob already exists.
|
||||
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
|
||||
// that should be fixed there, no need to bring truncation into this, give the user an error.
|
||||
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
|
||||
CreateOpts[] opts = {CreateOpts.bufferSize(bufferSize)};
|
||||
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
|
||||
int bytesRead;
|
||||
byte[] buffer = new byte[bufferSize];
|
||||
while ((bytesRead = inputStream.read(buffer)) != -1) {
|
||||
stream.write(buffer, 0, bytesRead);
|
||||
// For safety we also hsync each write as well, because of its docs:
|
||||
// SYNC_BLOCK - to force closed blocks to the disk device
|
||||
// "In addition Syncable.hsync() should be called after each write,
|
||||
// if true synchronous behavior is required"
|
||||
stream.hsync();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(@Nullable final String prefix) throws IOException {
|
||||
FileStatus[] files = store.execute(new Operation<FileStatus[]>() {
|
||||
@Override
|
||||
public FileStatus[] run(FileContext fileContext) throws IOException {
|
||||
return (fileContext.util().listStatus(path, new PathFilter() {
|
||||
@Override
|
||||
public boolean accept(Path path) {
|
||||
return prefix == null || path.getName().startsWith(prefix);
|
||||
}
|
||||
}));
|
||||
}
|
||||
});
|
||||
FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,
|
||||
path -> prefix == null || path.getName().startsWith(prefix))));
|
||||
Map<String, BlobMetaData> map = new LinkedHashMap<String, BlobMetaData>();
|
||||
for (FileStatus file : files) {
|
||||
map.put(file.getPath().getName(), new PlainBlobMetaData(file.getPath().getName(), file.getLen()));
|
||||
|
@ -161,4 +139,51 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
|
|||
public Map<String, BlobMetaData> listBlobs() throws IOException {
|
||||
return listBlobsByPrefix(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exists to wrap underlying InputStream methods that might make socket connections in
|
||||
* doPrivileged blocks. This is due to the way that hdfs client libraries might open
|
||||
* socket connections when you are reading from an InputStream.
|
||||
*/
|
||||
private static class HDFSPrivilegedInputSteam extends FilterInputStream {
|
||||
|
||||
HDFSPrivilegedInputSteam(InputStream in) {
|
||||
super(in);
|
||||
}
|
||||
|
||||
public int read() throws IOException {
|
||||
return doPrivilegedOrThrow(in::read);
|
||||
}
|
||||
|
||||
public int read(byte b[]) throws IOException {
|
||||
return doPrivilegedOrThrow(() -> in.read(b));
|
||||
}
|
||||
|
||||
public int read(byte b[], int off, int len) throws IOException {
|
||||
return doPrivilegedOrThrow(() -> in.read(b, off, len));
|
||||
}
|
||||
|
||||
public long skip(long n) throws IOException {
|
||||
return doPrivilegedOrThrow(() -> in.skip(n));
|
||||
}
|
||||
|
||||
public int available() throws IOException {
|
||||
return doPrivilegedOrThrow(() -> in.available());
|
||||
}
|
||||
|
||||
public synchronized void reset() throws IOException {
|
||||
doPrivilegedOrThrow(() -> {
|
||||
in.reset();
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private static <T> T doPrivilegedOrThrow(PrivilegedExceptionAction<T> action) throws IOException {
|
||||
try {
|
||||
return AccessController.doPrivileged(action);
|
||||
} catch (PrivilegedActionException e) {
|
||||
throw (IOException) e.getCause();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.blobstore.BlobStore;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.ReflectPermission;
|
||||
import java.net.SocketPermission;
|
||||
import java.security.AccessController;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -46,12 +47,7 @@ final class HdfsBlobStore implements BlobStore {
|
|||
HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException {
|
||||
this.fileContext = fileContext;
|
||||
this.bufferSize = bufferSize;
|
||||
this.root = execute(new Operation<Path>() {
|
||||
@Override
|
||||
public Path run(FileContext fileContext) throws IOException {
|
||||
return fileContext.makeQualified(new Path(path));
|
||||
}
|
||||
});
|
||||
this.root = execute(fileContext1 -> fileContext1.makeQualified(new Path(path)));
|
||||
try {
|
||||
mkdirs(root);
|
||||
} catch (FileAlreadyExistsException ok) {
|
||||
|
@ -60,23 +56,17 @@ final class HdfsBlobStore implements BlobStore {
|
|||
}
|
||||
|
||||
private void mkdirs(Path path) throws IOException {
|
||||
execute(new Operation<Void>() {
|
||||
@Override
|
||||
public Void run(FileContext fileContext) throws IOException {
|
||||
fileContext.mkdir(path, null, true);
|
||||
return null;
|
||||
}
|
||||
execute((Operation<Void>) fileContext -> {
|
||||
fileContext.mkdir(path, null, true);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(BlobPath path) throws IOException {
|
||||
execute(new Operation<Void>() {
|
||||
@Override
|
||||
public Void run(FileContext fc) throws IOException {
|
||||
fc.delete(translateToHdfsPath(path), true);
|
||||
return null;
|
||||
}
|
||||
execute((Operation<Void>) fc -> {
|
||||
fc.delete(translateToHdfsPath(path), true);
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -128,7 +118,7 @@ final class HdfsBlobStore implements BlobStore {
|
|||
try {
|
||||
return AccessController.doPrivileged((PrivilegedExceptionAction<V>)
|
||||
() -> operation.run(fileContext), null, new ReflectPermission("suppressAccessChecks"),
|
||||
new AuthPermission("modifyPrivateCredentials"));
|
||||
new AuthPermission("modifyPrivateCredentials"), new SocketPermission("*", "connect"));
|
||||
} catch (PrivilegedActionException pae) {
|
||||
throw (IOException) pae.getException();
|
||||
}
|
||||
|
|
|
@ -134,15 +134,12 @@ public final class HdfsRepository extends BlobStoreRepository {
|
|||
cfg.setBoolean("fs.hdfs.impl.disable.cache", true);
|
||||
|
||||
// create the filecontext with our user
|
||||
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
|
||||
@Override
|
||||
public FileContext run() {
|
||||
try {
|
||||
AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
|
||||
return FileContext.getFileContext(fs, cfg);
|
||||
} catch (UnsupportedFileSystemException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
|
||||
try {
|
||||
AbstractFileSystem fs = AbstractFileSystem.get(uri, cfg);
|
||||
return FileContext.getFileContext(fs, cfg);
|
||||
} catch (UnsupportedFileSystemException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -32,28 +32,25 @@ import java.io.IOException;
|
|||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.AccessController;
|
||||
import java.security.Principal;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedActionException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collections;
|
||||
|
||||
public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
||||
|
||||
@Override
|
||||
protected BlobStore newBlobStore() throws IOException {
|
||||
return AccessController.doPrivileged(
|
||||
new PrivilegedAction<HdfsBlobStore>() {
|
||||
@Override
|
||||
public HdfsBlobStore run() {
|
||||
try {
|
||||
FileContext fileContext = createContext(new URI("hdfs:///"));
|
||||
return new HdfsBlobStore(fileContext, "temp", 1024);
|
||||
} catch (IOException | URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
FileContext fileContext;
|
||||
try {
|
||||
fileContext = AccessController.doPrivileged((PrivilegedExceptionAction<FileContext>)
|
||||
() -> createContext(new URI("hdfs:///")));
|
||||
} catch (PrivilegedActionException e) {
|
||||
throw new RuntimeException(e.getCause());
|
||||
}
|
||||
return new HdfsBlobStore(fileContext, "temp", 1024);
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "lesser of two evils (the other being a bunch of JNI/classloader nightmares)")
|
||||
|
@ -90,15 +87,12 @@ public class HdfsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
|
|||
cfg.set("fs.AbstractFileSystem." + uri.getScheme() + ".impl", TestingFs.class.getName());
|
||||
|
||||
// create the FileContext with our user
|
||||
return Subject.doAs(subject, new PrivilegedAction<FileContext>() {
|
||||
@Override
|
||||
public FileContext run() {
|
||||
try {
|
||||
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
|
||||
return FileContext.getFileContext(fs, cfg);
|
||||
} catch (UnsupportedFileSystemException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return Subject.doAs(subject, (PrivilegedAction<FileContext>) () -> {
|
||||
try {
|
||||
TestingFs fs = (TestingFs) AbstractFileSystem.get(uri, cfg);
|
||||
return FileContext.getFileContext(fs, cfg);
|
||||
} catch (UnsupportedFileSystemException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue