From 5caa10134545e5541328919b3f70ca86d0b9095d Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 15 Oct 2019 12:10:19 +0200 Subject: [PATCH] Fix Bug in Azure Repo Exception Handling (#47968) (#48030) We were incorrectly handling `IOExceptions` thrown by the `InputStream` side of the upload operation, resulting in a `ClassCastException` as we expected to never get `IOException` from the Azure SDK code but we do in practice. This PR also sets an assertion on `markSupported` for the streams used by the SDK as adding the test for this scenario revealed that the SDK client would retry uploads for non-mark-supporting streams on `IOException` in the `InputStream`. --- .../azure/AzureBlobContainer.java | 3 +- .../repositories/azure/AzureBlobStore.java | 11 +++-- .../azure/AzureStorageService.java | 9 +++-- .../repositories/azure/SocketAccess.java | 16 ++++---- .../azure/AzureBlobContainerRetriesTests.java | 40 +++++++++++++++++++ 5 files changed, 59 insertions(+), 20 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index aaf7dc6391b..15afaada84c 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -62,7 +62,7 @@ public class AzureBlobContainer extends AbstractBlobContainer { logger.trace("blobExists({})", blobName); try { return blobStore.blobExists(buildKey(blobName)); - } catch (URISyntaxException | StorageException e) { + } catch (URISyntaxException | StorageException | IOException e) { logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage()); } return false; @@ -97,7 +97,6 @@ public class AzureBlobContainer extends AbstractBlobContainer { @Override public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); - try { blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } catch (URISyntaxException|StorageException e) { diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 968d8396f7e..714e29edea2 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -33,7 +33,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.io.InputStream; import java.net.URISyntaxException; -import java.nio.file.FileAlreadyExistsException; import java.util.Collections; import java.util.Map; import java.util.concurrent.Executor; @@ -88,11 +87,11 @@ public class AzureBlobStore implements BlobStore { public void close() { } - public boolean blobExists(String blob) throws URISyntaxException, StorageException { + public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException { return service.blobExists(clientName, container, blob); } - public void deleteBlob(String blob) throws URISyntaxException, StorageException { + public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException { service.deleteBlob(clientName, container, blob); } @@ -106,17 +105,17 @@ public class AzureBlobStore implements BlobStore { } public Map listBlobsByPrefix(String keyPath, String prefix) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, IOException { return service.listBlobsByPrefix(clientName, container, keyPath, prefix); } - public Map children(BlobPath path) throws URISyntaxException, StorageException { + public Map children(BlobPath path) throws URISyntaxException, StorageException, IOException { return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect( Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool)))); } public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) - throws URISyntaxException, StorageException, FileAlreadyExistsException { + throws URISyntaxException, StorageException, IOException { service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists); } } diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index fbdac39b9f6..ca7cb3475b4 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -268,7 +268,7 @@ public class AzureStorageService { } public Map listBlobsByPrefix(String account, String container, String keyPath, String prefix) - throws URISyntaxException, StorageException { + throws URISyntaxException, StorageException, IOException { // NOTE: this should be here: if (prefix == null) prefix = ""; // however, this is really inefficient since deleteBlobsByPrefix enumerates everything and // then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix! @@ -296,7 +296,7 @@ public class AzureStorageService { return blobsBuilder.immutableMap(); } - public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException { + public Set children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException { final Set blobsBuilder = new HashSet<>(); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); @@ -320,8 +320,9 @@ public class AzureStorageService { } public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, - boolean failIfAlreadyExists) - throws URISyntaxException, StorageException, FileAlreadyExistsException { + boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException { + assert inputStream.markSupported() + : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken"; logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize)); final Tuple> client = client(account); final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java index 1400cc5b066..18acf088cdb 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/SocketAccess.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.StorageException; +import org.apache.logging.log4j.core.util.Throwables; import org.elasticsearch.SpecialPermission; import java.io.IOException; @@ -44,7 +45,9 @@ public final class SocketAccess { try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - throw (IOException) e.getCause(); + Throwables.rethrow(e.getCause()); + assert false : "always throws"; + return null; } } @@ -53,7 +56,9 @@ public final class SocketAccess { try { return AccessController.doPrivileged(operation); } catch (PrivilegedActionException e) { - throw (StorageException) e.getCause(); + Throwables.rethrow(e.getCause()); + assert false : "always throws"; + return null; } } @@ -65,12 +70,7 @@ public final class SocketAccess { return null; }); } catch (PrivilegedActionException e) { - Throwable cause = e.getCause(); - if (cause instanceof StorageException) { - throw (StorageException) cause; - } else { - throw (URISyntaxException) cause; - } + Throwables.rethrow(e.getCause()); } } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 0aa7a3b0922..daf4e9ad57b 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -49,6 +49,7 @@ import org.junit.After; import org.junit.Before; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.InetAddress; @@ -63,6 +64,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -294,6 +296,44 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { assertThat(blocks.isEmpty(), is(true)); } + public void testRetryUntilFail() throws IOException { + final AtomicBoolean requestReceived = new AtomicBoolean(false); + httpServer.createContext("/container/write_blob_max_retries", exchange -> { + try { + if (requestReceived.compareAndSet(false, true)) { + throw new AssertionError("Should not receive two requests"); + } else { + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + } + } finally { + exchange.close(); + } + }); + + final BlobContainer blobContainer = createBlobContainer(randomIntBetween(2, 5)); + try (InputStream stream = new InputStream() { + + @Override + public int read() throws IOException { + throw new IOException("foo"); + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public void reset() { + throw new AssertionError("should not be called"); + } + }) { + final IOException ioe = expectThrows(IOException.class, () -> + blobContainer.writeBlob("write_blob_max_retries", stream, randomIntBetween(1, 128), randomBoolean())); + assertThat(ioe.getMessage(), is("foo")); + } + } + private static byte[] randomBlobContent() { return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb }