make sure BlobStore.close always triggers ACE on any access afterwards

This commit is contained in:
Robert Muir 2015-12-22 00:21:03 -05:00
parent a04268e42e
commit 9573bb9f15
3 changed files with 73 additions and 76 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -40,20 +41,21 @@ import java.util.LinkedHashMap;
import java.util.Map;
final class HdfsBlobContainer extends AbstractBlobContainer {
private final HdfsRepository repository;
private final HdfsBlobStore store;
private final Path path;
private final int bufferSize;
HdfsBlobContainer(BlobPath blobPath, HdfsRepository repository, Path path) {
HdfsBlobContainer(BlobPath blobPath, HdfsBlobStore store, Path path, int bufferSize) {
super(blobPath);
this.repository = repository;
this.store = store;
this.path = path;
this.bufferSize = bufferSize;
}
@Override
public boolean blobExists(String blobName) {
try {
return repository.execute(new HdfsRepository.Operation<Boolean>() {
return store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.util().exists(new Path(path, blobName));
@ -67,7 +69,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public void deleteBlob(String blobName) throws IOException {
try {
repository.execute(new HdfsRepository.Operation<Boolean>() {
store.execute(new Operation<Boolean>() {
@Override
public Boolean run(FileContext fileContext) throws IOException {
return fileContext.delete(new Path(path, blobName), true);
@ -80,7 +82,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
repository.execute(new HdfsRepository.Operation<Void>() {
store.execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
fileContext.rename(new Path(path, sourceBlobName), new Path(path, targetBlobName));
@ -92,17 +94,17 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public InputStream readBlob(String blobName) throws IOException {
// FSDataInputStream does buffering internally
return repository.execute(new HdfsRepository.Operation<InputStream>() {
return store.execute(new Operation<InputStream>() {
@Override
public InputStream run(FileContext fileContext) throws IOException {
return fileContext.open(new Path(path, blobName), repository.bufferSizeInBytes);
return fileContext.open(new Path(path, blobName), bufferSize);
}
});
}
@Override
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
repository.execute(new HdfsRepository.Operation<Void>() {
store.execute(new Operation<Void>() {
@Override
public Void run(FileContext fileContext) throws IOException {
Path blob = new Path(path, blobName);
@ -110,10 +112,10 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
// NOTE: this behavior differs from FSBlobContainer, which passes TRUNCATE_EXISTING
// that should be fixed there, no need to bring truncation into this, give the user an error.
EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK);
CreateOpts[] opts = { CreateOpts.bufferSize(repository.bufferSizeInBytes) };
CreateOpts[] opts = { CreateOpts.bufferSize(bufferSize) };
try (FSDataOutputStream stream = fileContext.create(blob, flags, opts)) {
int bytesRead;
byte[] buffer = new byte[repository.bufferSizeInBytes];
byte[] buffer = new byte[bufferSize];
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
// For safety we also hsync each write as well, because of its docs:
@ -130,7 +132,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public Map<String, BlobMetaData> listBlobsByPrefix(final @Nullable String blobNamePrefix) throws IOException {
FileStatus[] files = repository.execute(new HdfsRepository.Operation<FileStatus[]>() {
FileStatus[] files = store.execute(new Operation<FileStatus[]>() {
@Override
public FileStatus[] run(FileContext fileContext) throws IOException {
return (fileContext.util().listStatus(path, new PathFilter() {
@ -150,7 +152,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
@Override
public Map<String, BlobMetaData> listBlobs() throws IOException {
FileStatus[] files = repository.execute(new HdfsRepository.Operation<FileStatus[]>() {
FileStatus[] files = store.execute(new Operation<FileStatus[]>() {
@Override
public FileStatus[] run(FileContext fileContext) throws IOException {
return fileContext.util().listStatus(path);

View File

@ -21,22 +21,34 @@ package org.elasticsearch.repositories.hdfs;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
final class HdfsBlobStore implements BlobStore {
private final HdfsRepository repository;
private final Path root;
private final FileContext fileContext;
private final int bufferSize;
private volatile boolean closed;
HdfsBlobStore(HdfsRepository repository, Path root) throws IOException {
this.repository = repository;
this.root = root;
HdfsBlobStore(FileContext fileContext, String path, int bufferSize) throws IOException {
this.fileContext = fileContext;
this.bufferSize = bufferSize;
this.root = execute(new Operation<Path>() {
@Override
public Path run(FileContext fileContext) throws IOException {
return fileContext.makeQualified(new Path(path));
}
});
try {
mkdirs(root);
} catch (FileAlreadyExistsException ok) {
@ -45,10 +57,10 @@ final class HdfsBlobStore implements BlobStore {
}
private void mkdirs(Path path) throws IOException {
repository.execute(new HdfsRepository.Operation<Void>() {
execute(new Operation<Void>() {
@Override
public Void run(FileContext fc) throws IOException {
fc.mkdir(path, null, true);
public Void run(FileContext fileContext) throws IOException {
fileContext.mkdir(path, null, true);
return null;
}
});
@ -61,12 +73,12 @@ final class HdfsBlobStore implements BlobStore {
@Override
public BlobContainer blobContainer(BlobPath path) {
return new HdfsBlobContainer(path, repository, buildHdfsPath(path));
return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize);
}
@Override
public void delete(BlobPath path) throws IOException {
repository.execute(new HdfsRepository.Operation<Void>() {
execute(new Operation<Void>() {
@Override
public Void run(FileContext fc) throws IOException {
fc.delete(translateToHdfsPath(path), true);
@ -94,9 +106,38 @@ final class HdfsBlobStore implements BlobStore {
}
return path;
}
interface Operation<V> {
V run(FileContext fileContext) throws IOException;
}
/**
* Executes the provided operation against this store
*/
<V> V execute(Operation<V> operation) throws IOException {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
sm.checkPermission(new SpecialPermission());
}
if (closed) {
throw new AlreadyClosedException("HdfsBlobStore is closed: " + root);
}
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<V>() {
@Override
public V run() throws IOException {
return operation.run(fileContext);
}
});
} catch (PrivilegedActionException pae) {
throw (IOException) pae.getException();
}
}
@Override
public void close() {
//
closed = true;
}
}

View File

@ -24,8 +24,6 @@ import java.net.URI;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
@ -36,10 +34,7 @@ import javax.security.auth.Subject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.Strings;
@ -60,10 +55,8 @@ public final class HdfsRepository extends BlobStoreRepository {
private final RepositorySettings repositorySettings;
private final ByteSizeValue chunkSize;
private final boolean compress;
final int bufferSizeInBytes;
private HdfsBlobStore blobStore;
private volatile FileContext fileContext;
@Inject
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
@ -72,7 +65,6 @@ public final class HdfsRepository extends BlobStoreRepository {
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", null);
this.compress = repositorySettings.settings().getAsBoolean("compress", false);
this.bufferSizeInBytes = (int) repositorySettings.settings().getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
}
@Override
@ -96,6 +88,8 @@ public final class HdfsRepository extends BlobStoreRepository {
if (pathSetting == null) {
throw new IllegalArgumentException("No 'path' defined for hdfs snapshot/restore");
}
int bufferSize = (int) repositorySettings.settings().getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
try {
// initialize our filecontext
@ -103,20 +97,14 @@ public final class HdfsRepository extends BlobStoreRepository {
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
fileContext = AccessController.doPrivileged(new PrivilegedAction<FileContext>() {
FileContext fileContext = AccessController.doPrivileged(new PrivilegedAction<FileContext>() {
@Override
public FileContext run() {
return createContext(uri, repositorySettings);
}
});
Path hdfsPath = execute(new Operation<Path>() {
@Override
public Path run(FileContext fileContext) throws IOException {
return fileContext.makeQualified(new Path(pathSetting));
}
});
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), hdfsPath);
blobStore = new HdfsBlobStore(this, hdfsPath);
blobStore = new HdfsBlobStore(fileContext, pathSetting, bufferSize);
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fileContext.getDefaultFileSystem(), fileContext.getDefaultFileSystem().getUri(), pathSetting);
} catch (IOException e) {
throw new ElasticsearchGenerationException(String.format(Locale.ROOT, "Cannot create HDFS repository for uri [%s]", uri), e);
}
@ -183,38 +171,4 @@ public final class HdfsRepository extends BlobStoreRepository {
protected ByteSizeValue chunkSize() {
return chunkSize;
}
@Override
protected void doClose() throws ElasticsearchException {
super.doClose();
fileContext = null;
}
interface Operation<V> {
V run(FileContext fileContext) throws IOException;
}
/**
* Executes the provided operation against this repository
*/
<V> V execute(Operation<V> operation) throws IOException {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// unprivileged code such as scripts do not have SpecialPermission
sm.checkPermission(new SpecialPermission());
}
if (fileContext == null) {
throw new AlreadyClosedException("repository is closed: " + repositoryName);
}
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<V>() {
@Override
public V run() throws IOException {
return operation.run(fileContext);
}
});
} catch (PrivilegedActionException pae) {
throw (IOException) pae.getException();
}
}
}