From 10d9819f99c5cf27dc72fbaae3fd639baf161308 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 3 Jan 2019 15:51:47 +0100 Subject: [PATCH] Implement Atomic Blob Writes for HDFS Repository (#37066) * Implement atomic writes the same way we do for the FsBlobContainer via rename which is atomic * Relates #37011 --- .../repositories/hdfs/HdfsBlobContainer.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index 580d033354e..5a2e727d30b 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -21,11 +21,13 @@ package org.elasticsearch.repositories.hdfs; import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Path; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.blobstore.fs.FsBlobContainer; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.repositories.hdfs.HdfsBlobStore.Operation; @@ -116,6 +118,29 @@ final class HdfsBlobContainer extends AbstractBlobContainer { }); } + @Override + public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + final String tempBlob = FsBlobContainer.tempBlobName(blobName); + store.execute((Operation) fileContext -> { + final Path tempBlobPath = new Path(path, tempBlob); + try (FSDataOutputStream stream = fileContext.create( + tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK), CreateOpts.bufferSize(bufferSize))) { + int bytesRead; + byte[] buffer = new byte[bufferSize]; + while ((bytesRead = inputStream.read(buffer)) != -1) { + stream.write(buffer, 0, bytesRead); + } + } + final Path blob = new Path(path, blobName); + try { + fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE); + } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { + throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage()); + } + return null; + }); + } + @Override public Map listBlobsByPrefix(@Nullable final String prefix) throws IOException { FileStatus[] files = store.execute(fileContext -> (fileContext.util().listStatus(path,