SNAPSHOT: Make Atomic Blob Writes Mandatory ()

* With  introducing atomic writes to HDFS repository we can enforce atomic write capabilities on this interface
* The overrides on the other three cloud implementations are ok because:
   * https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html states that "Amazon S3 never adds partial objects; if you receive a success response, Amazon S3 added the entire object to the bucket."
   * https://cloud.google.com/storage/docs/consistency states that GCS has strong read-after-write consistency
   * https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks Azure has the concept of committing blobs, so there's no partial content here either
* Relates 
This commit is contained in:
Armin Braun 2019-01-07 12:11:19 +01:00 committed by GitHub
parent 7cc749dced
commit 617e294133
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 22 additions and 8 deletions
modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url
plugins
repository-azure/src/main/java/org/elasticsearch/repositories/azure
repository-gcs/src/main/java/org/elasticsearch/repositories/gcs
repository-s3/src/main/java/org/elasticsearch/repositories/s3
server/src/main/java/org/elasticsearch/common/blobstore

@ -112,6 +112,11 @@ public class URLBlobContainer extends AbstractBlobContainer {
throw new UnsupportedOperationException("URL repository doesn't support this operation"); throw new UnsupportedOperationException("URL repository doesn't support this operation");
} }
@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
@SuppressForbidden(reason = "We call connect in doPrivileged and provide SocketPermission") @SuppressForbidden(reason = "We call connect in doPrivileged and provide SocketPermission")
private static InputStream getInputStream(URL url) throws IOException { private static InputStream getInputStream(URL url) throws IOException {
try { try {

@ -96,6 +96,11 @@ public class AzureBlobContainer extends AbstractBlobContainer {
} }
} }
@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override @Override
public void deleteBlob(String blobName) throws IOException { public void deleteBlob(String blobName) throws IOException {
logger.trace("deleteBlob({})", blobName); logger.trace("deleteBlob({})", blobName);

@ -68,6 +68,11 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
} }
@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override @Override
public void deleteBlob(String blobName) throws IOException { public void deleteBlob(String blobName) throws IOException {
blobStore.deleteBlob(buildKey(blobName)); blobStore.deleteBlob(buildKey(blobName));

@ -105,6 +105,11 @@ class S3BlobContainer extends AbstractBlobContainer {
}); });
} }
@Override
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override @Override
public void deleteBlob(String blobName) throws IOException { public void deleteBlob(String blobName) throws IOException {
if (blobExists(blobName) == false) { if (blobExists(blobName) == false) {

@ -78,9 +78,7 @@ public interface BlobContainer {
/** /**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name, * Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it. When the BlobContainer implementation * using an atomic write operation if the implementation supports it.
* does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then
* the {@link #writeBlob(String, InputStream, long, boolean)} method is used.
* *
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the * This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown. * same name already exists, the operation will fail and an {@link IOException} will be thrown.
@ -97,11 +95,7 @@ public interface BlobContainer {
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to. * @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/ */
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize, boolean failIfAlreadyExists) void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException;
throws IOException {
writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
}
/** /**
* Deletes a blob with giving name, if the blob exists. If the blob does not exist, * Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException. * this method throws a NoSuchFileException.