Use stronger write-once semantics for Azure repository (#30437)
There's no need for an extra blobExists() call when writing a blob to the Azure service. Azure provides an option (with stronger consistency guarantees) on the upload method that guarantees that the blob that's uploaded does not already exist. This saves one network roundtrip. Relates to #19749
This commit is contained in:
parent
4e9b554948
commit
467ea50c4e
|
@ -164,7 +164,12 @@ public class AzureStorageTestServer {
|
|||
if (destContainer == null) {
|
||||
return newContainerNotFoundError(requestId);
|
||||
}
|
||||
destContainer.objects.put(destBlobName, body);
|
||||
|
||||
byte[] existingBytes = destContainer.objects.putIfAbsent(destBlobName, body);
|
||||
if (existingBytes != null) {
|
||||
return newBlobAlreadyExistsError(requestId);
|
||||
}
|
||||
|
||||
return new Response(RestStatus.CREATED, emptyMap(), "text/plain", EMPTY_BYTE);
|
||||
})
|
||||
);
|
||||
|
@ -363,6 +368,10 @@ public class AzureStorageTestServer {
|
|||
return newError(requestId, RestStatus.NOT_FOUND, "BlobNotFound", "The specified blob does not exist");
|
||||
}
|
||||
|
||||
private static Response newBlobAlreadyExistsError(final long requestId) {
|
||||
return newError(requestId, RestStatus.CONFLICT, "BlobAlreadyExists", "The specified blob already exists");
|
||||
}
|
||||
|
||||
private static Response newInternalError(final long requestId) {
|
||||
return newError(requestId, RestStatus.INTERNAL_SERVER_ERROR, "InternalError", "The server encountered an internal error");
|
||||
}
|
||||
|
|
|
@ -88,10 +88,8 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
|||
|
||||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
||||
if (blobExists(blobName)) {
|
||||
throw new FileAlreadyExistsException("blob [" + blobName + "] already exists, cannot overwrite");
|
||||
}
|
||||
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
|
||||
|
||||
try {
|
||||
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize);
|
||||
} catch (URISyntaxException|StorageException e) {
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -114,7 +115,8 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {
|
|||
return this.client.listBlobsByPrefix(this.clientName, this.locMode, container, keyPath, prefix);
|
||||
}
|
||||
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException {
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws URISyntaxException, StorageException,
|
||||
FileAlreadyExistsException {
|
||||
this.client.writeBlob(this.clientName, this.locMode, container, blobName, inputStream, blobSize);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -58,7 +59,7 @@ public interface AzureStorageService {
|
|||
throws URISyntaxException, StorageException;
|
||||
|
||||
void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize) throws
|
||||
URISyntaxException, StorageException;
|
||||
URISyntaxException, StorageException, FileAlreadyExistsException;
|
||||
|
||||
static InputStream giveSocketPermissionsToStream(InputStream stream) {
|
||||
return new InputStream() {
|
||||
|
|
|
@ -19,11 +19,13 @@
|
|||
|
||||
package org.elasticsearch.repositories.azure;
|
||||
|
||||
import com.microsoft.azure.storage.AccessCondition;
|
||||
import com.microsoft.azure.storage.CloudStorageAccount;
|
||||
import com.microsoft.azure.storage.LocationMode;
|
||||
import com.microsoft.azure.storage.OperationContext;
|
||||
import com.microsoft.azure.storage.RetryExponentialRetry;
|
||||
import com.microsoft.azure.storage.RetryPolicy;
|
||||
import com.microsoft.azure.storage.StorageErrorCodeStrings;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
|
@ -44,8 +46,10 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.repositories.RepositoryException;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
|
@ -289,12 +293,21 @@ public class AzureStorageServiceImpl extends AbstractComponent implements AzureS
|
|||
|
||||
@Override
|
||||
public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize)
|
||||
throws URISyntaxException, StorageException {
|
||||
throws URISyntaxException, StorageException, FileAlreadyExistsException {
|
||||
logger.trace("writeBlob({}, stream, {})", blobName, blobSize);
|
||||
CloudBlobClient client = this.getSelectedClient(account, mode);
|
||||
CloudBlobContainer blobContainer = client.getContainerReference(container);
|
||||
CloudBlockBlob blob = blobContainer.getBlockBlobReference(blobName);
|
||||
SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, null, null, generateOperationContext(account)));
|
||||
try {
|
||||
SocketAccess.doPrivilegedVoidException(() -> blob.upload(inputStream, blobSize, AccessCondition.generateIfNotExistsCondition(),
|
||||
null, generateOperationContext(account)));
|
||||
} catch (StorageException se) {
|
||||
if (se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
|
||||
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
|
||||
throw new FileAlreadyExistsException(blobName, null, se.getMessage());
|
||||
}
|
||||
throw se;
|
||||
}
|
||||
logger.trace("writeBlob({}, stream, {}) - done", blobName, blobSize);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.SocketPermission;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.security.AccessController;
|
||||
import java.util.Locale;
|
||||
|
@ -108,7 +109,10 @@ public class AzureStorageServiceMock extends AbstractComponent implements AzureS
|
|||
|
||||
@Override
|
||||
public void writeBlob(String account, LocationMode mode, String container, String blobName, InputStream inputStream, long blobSize)
|
||||
throws URISyntaxException, StorageException {
|
||||
throws URISyntaxException, StorageException, FileAlreadyExistsException {
|
||||
if (blobs.containsKey(blobName)) {
|
||||
throw new FileAlreadyExistsException(blobName);
|
||||
}
|
||||
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
||||
blobs.put(blobName, outputStream);
|
||||
Streams.copy(inputStream, outputStream);
|
||||
|
|
Loading…
Reference in New Issue