From 9573bb9f151303ffeffbc3bc94d1db35ffaa8809 Mon Sep 17 00:00:00 2001 From: Robert Muir Date: Tue, 22 Dec 2015 00:21:03 -0500 Subject: [PATCH] make sure BlobStore.close always triggers ACE on any access afterwards --- .../repositories/hdfs/HdfsBlobContainer.java | 30 ++++----- .../repositories/hdfs/HdfsBlobStore.java | 63 +++++++++++++++---- .../repositories/hdfs/HdfsRepository.java | 56 ++--------------- 3 files changed, 73 insertions(+), 76 deletions(-) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 7e9740aac7a..45eea7f00ef 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; +import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; import java.io.FileNotFoundException; import java.io.IOException; @@ -40,20 +41,21 @@ import java.util.LinkedHashMap; import java.util.Map; final class HdfsBlobContainer extends AbstractBlobContainer { - - private final HdfsRepository repository; + private final HdfsBlobStore store; private final Path path; + private final int bufferSize; - HdfsBlobContainer(BlobPath blobPath, HdfsRepository repository, Path path) { + HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore store, Path path, int bufferSize) { super(blobPath); - this.repository = repository; + this.store = store; this.path = path; + this.bufferSize = bufferSize; } @Override public boolean blobExists(String blobName) { try { - return repository.execute(new HdfsRepository.Operation() { + return store.execute(new Operation() { @Override public Boolean run(FileContext fileContext) throws IOException { return fileContext.util().exists(new Path(path, blobName)); @@ -67,7 +69,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer { @Override public void deleteBlob(String blobName) throws IOException { try { - repository.execute(new HdfsRepository.Operation() { + store.execute(new Operation() { @Override public Boolean run(FileContext fileContext) throws IOException { return fileContext.delete(new Path(path, blobName), true); @@ -80,7 +82,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer { @Override public void move(String sourceBlobName, String targetBlobName) throws IOException { - repository.execute(new HdfsRepository.Operation() { + store.execute(new Operation() { @Override public Void run(FileContext fileContext) throws IOException { fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName)); @@ -92,17 +94,17 @@ final class HdfsBlobContainer extends AbstractBlobContainer { @Override public InputStream readBlob(String blobName) throws IOException { // FSDataInputStream does buffering internally - return repository.execute(new HdfsRepository.Operation() { + return store.execute(new Operation() { @Override public InputStream run(FileContext fileContext) throws IOException { - return fileContext.open(new Path(path, blobName), repository.bufferSizeInBytes); + return fileContext.open(new Path(path, blobName), bufferSize); } }); } @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException { - repository.execute(new HdfsRepository.Operation() { + store.execute(new Operation() { @Override public Void run(FileContext fileContext) throws IOException { Path blob = new Path(path, blobName); @@ -110,10 +112,10 @@ final class HdfsBlobContainer extends AbstractBlobContainer { // NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING // that should be fixed there, no need to bring truncation into this, give the user an error. EnumSet flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK); - CreateOpts[] opts = { CreateOpts.bufferSize(repository.bufferSizeInBytes) }; + CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) }; try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) { int bytesRead; - byte[] buffer = new byte[repository.bufferSizeInBytes]; + byte[] buffer = new byte[bufferSize]; while ((bytesRead = inputStream.read(buffer)) != -1) { stream.write(buffer, 0, bytesRead); // For safety we also hsync each write as well, because of its docs: @@ -130,7 +132,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer { @Override public Map listBlobsByPrefix(final @Nullable String blobNamePrefix) throws IOException { - FileStatus[] files = repository.execute(new HdfsRepository.Operation() { + FileStatus[] files = store.execute(new Operation() { @Override public FileStatus[] run(FileContext fileContext) throws IOException { return (fileContext.util().listStatus(path, new PathFilter() { @@ -150,7 +152,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer { @Override public Map listBlobs() throws IOException { - FileStatus[] files = repository.execute(new HdfsRepository.Operation() { + FileStatus[] files = store.execute(new Operation() { @Override public FileStatus[] run(FileContext fileContext) throws IOException { return fileContext.util().listStatus(path); diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java index 1d182925f7e..a9d1f3e2d68 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java @@ -21,22 +21,34 @@ package org.elasticsearch.repositories.hdfs; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import java.io.IOException; +import java.security.AccessController; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; final class HdfsBlobStore implements BlobStore { - private final HdfsRepository repository; private final Path root; + private final FileContext fileContext; + private final int bufferSize; + private volatile boolean closed; - HdfsBlobStore(HdfsRepository repository, Path root) throws IOException { - this.repository = repository; - this.root = root; - + HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException { + this.fileContext = fileContext; + this.bufferSize = bufferSize; + this.root = execute(new Operation() { + @Override + public Path run(FileContext fileContext) throws IOException { + return fileContext.makeQualified(new Path(path)); + } + }); try { mkdirs(root); } catch (FileAlreadyExistsException ok) { @@ -45,10 +57,10 @@ final class HdfsBlobStore implements BlobStore { } private void mkdirs(Path path) throws IOException { - repository.execute(new HdfsRepository.Operation() { + execute(new Operation() { @Override - public Void run(FileContext fc) throws IOException { - fc.mkdir(path, null, true); + public Void run(FileContext fileContext) throws IOException { + fileContext.mkdir(path, null, true); return null; } }); @@ -61,12 +73,12 @@ final class HdfsBlobStore implements BlobStore { @Override public BlobContainer blobContainer(BlobPath path) { - return new HdfsBlobContainer(path, repository, buildHdfsPath(path)); + return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize); } @Override public void delete(BlobPath path) throws IOException { - repository.execute(new HdfsRepository.Operation() { + execute(new Operation() { @Override public Void run(FileContext fc) throws IOException { fc.delete(translateToHdfsPath(path), true); @@ -94,9 +106,38 @@ final class HdfsBlobStore implements BlobStore { } return path; } + + + interface Operation { + V run(FileContext fileContext) throws IOException; + } + + /** + * Executes the provided operation against this store + */ + V execute(Operation operation) throws IOException { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + // unprivileged code such as scripts do not have SpecialPermission + sm.checkPermission(new SpecialPermission()); + } + if (closed) { + throw new AlreadyClosedException("HdfsBlobStore is closed: " + root); + } + try { + return AccessController.doPrivileged(new PrivilegedExceptionAction() { + @Override + public V run() throws IOException { + return operation.run(fileContext); + } + }); + } catch (PrivilegedActionException pae) { + throw (IOException) pae.getException(); + } + } @Override public void close() { - // + closed = true; } } \ No newline at end of file diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index cec4e6b08a1..e0fe49498d6 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -24,8 +24,6 @@ import java.net.URI; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; -import java.security.PrivilegedActionException; -import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.Locale; import java.util.Map; @@ -36,10 +34,7 @@ import javax.security.auth.Subject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchGenerationException; import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.Strings; @@ -60,10 +55,8 @@ public final class HdfsRepository extends BlobStoreRepository { private final RepositorySettings repositorySettings; private final ByteSizeValue chunkSize; private final boolean compress; - final int bufferSizeInBytes; private HdfsBlobStore blobStore; - private volatile FileContext fileContext; @Inject public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { @@ -72,7 +65,6 @@ public final class HdfsRepository extends BlobStoreRepository { this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", null); this.compress = repositorySettings.settings().getAsBoolean("compress", false); - this.bufferSizeInBytes = (int) repositorySettings.settings().getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); } @Override @@ -96,6 +88,8 @@ public final class HdfsRepository extends BlobStoreRepository { if (pathSetting == null) { throw new IllegalArgumentException("No 'path' defined for hdfs snapshot/restore"); } + + int bufferSize = (int) repositorySettings.settings().getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); try { // initialize our filecontext @@ -103,20 +97,14 @@ public final class HdfsRepository extends BlobStoreRepository { if (sm != null) { sm.checkPermission(new SpecialPermission()); } - fileContext = AccessController.doPrivileged(new PrivilegedAction() { + FileContext fileContext = AccessController.doPrivileged(new PrivilegedAction() { @Override public FileContext run() { return createContext(uri, repositorySettings); } }); - Path hdfsPath = execute(new Operation() { - @Override - public Path run(FileContext fileContext) throws IOException { - return fileContext.makeQualified(new Path(pathSetting)); - } - }); - logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), hdfsPath); - blobStore = new HdfsBlobStore(this, hdfsPath); + blobStore = new HdfsBlobStore(fileContext, pathSetting, bufferSize); + logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), pathSetting); } catch (IOException e) { throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e); } @@ -183,38 +171,4 @@ public final class HdfsRepository extends BlobStoreRepository { protected ByteSizeValue chunkSize() { return chunkSize; } - - @Override - protected void doClose() throws ElasticsearchException { - super.doClose(); - fileContext = null; - } - - interface Operation { - V run(FileContext fileContext) throws IOException; - } - - /** - * Executes the provided operation against this repository - */ - V execute(Operation operation) throws IOException { - SecurityManager sm = System.getSecurityManager(); - if (sm != null) { - // unprivileged code such as scripts do not have SpecialPermission - sm.checkPermission(new SpecialPermission()); - } - if (fileContext == null) { - throw new AlreadyClosedException("repository is closed: " + repositoryName); - } - try { - return AccessController.doPrivileged(new PrivilegedExceptionAction() { - @Override - public V run() throws IOException { - return operation.run(fileContext); - } - }); - } catch (PrivilegedActionException pae) { - throw (IOException) pae.getException(); - } - } }