We were incorrectly handling `IOExceptions` thrown by the `InputStream` side of the upload operation, resulting in a `ClassCastException` as we expected to never get `IOException` from the Azure SDK code but we do in practice. This PR also sets an assertion on `markSupported` for the streams used by the SDK as adding the test for this scenario revealed that the SDK client would retry uploads for non-mark-supporting streams on `IOException` in the `InputStream`.
This commit is contained in:
parent
b858e19bcc
commit
5caa101345
|
@ -62,7 +62,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
|||
logger.trace("blobExists({})", blobName);
|
||||
try {
|
||||
return blobStore.blobExists(buildKey(blobName));
|
||||
} catch (URISyntaxException | StorageException e) {
|
||||
} catch (URISyntaxException | StorageException | IOException e) {
|
||||
logger.warn("can not access [{}] in container {{}}: {}", blobName, blobStore, e.getMessage());
|
||||
}
|
||||
return false;
|
||||
|
@ -97,7 +97,6 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
|||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
|
||||
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
|
||||
|
||||
try {
|
||||
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
|
||||
} catch (URISyntaxException|StorageException e) {
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
@ -88,11 +87,11 @@ public class AzureBlobStore implements BlobStore {
|
|||
public void close() {
|
||||
}
|
||||
|
||||
public boolean blobExists(String blob) throws URISyntaxException, StorageException {
|
||||
public boolean blobExists(String blob) throws URISyntaxException, StorageException, IOException {
|
||||
return service.blobExists(clientName, container, blob);
|
||||
}
|
||||
|
||||
public void deleteBlob(String blob) throws URISyntaxException, StorageException {
|
||||
public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
|
||||
service.deleteBlob(clientName, container, blob);
|
||||
}
|
||||
|
||||
|
@ -106,17 +105,17 @@ public class AzureBlobStore implements BlobStore {
|
|||
}
|
||||
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix)
|
||||
throws URISyntaxException, StorageException {
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
return service.listBlobsByPrefix(clientName, container, keyPath, prefix);
|
||||
}
|
||||
|
||||
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException {
|
||||
public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
|
||||
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
|
||||
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
|
||||
}
|
||||
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
|
||||
throws URISyntaxException, StorageException, FileAlreadyExistsException {
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
service.writeBlob(this.clientName, container, blobName, inputStream, blobSize, failIfAlreadyExists);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -268,7 +268,7 @@ public class AzureStorageService {
|
|||
}
|
||||
|
||||
public Map<String, BlobMetaData> listBlobsByPrefix(String account, String container, String keyPath, String prefix)
|
||||
throws URISyntaxException, StorageException {
|
||||
throws URISyntaxException, StorageException, IOException {
|
||||
// NOTE: this should be here: if (prefix == null) prefix = "";
|
||||
// however, this is really inefficient since deleteBlobsByPrefix enumerates everything and
|
||||
// then does a prefix match on the result; it should just call listBlobsByPrefix with the prefix!
|
||||
|
@ -296,7 +296,7 @@ public class AzureStorageService {
|
|||
return blobsBuilder.immutableMap();
|
||||
}
|
||||
|
||||
public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException {
|
||||
public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException, IOException {
|
||||
final Set<String> blobsBuilder = new HashSet<>();
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
|
@ -320,8 +320,9 @@ public class AzureStorageService {
|
|||
}
|
||||
|
||||
public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize,
|
||||
boolean failIfAlreadyExists)
|
||||
throws URISyntaxException, StorageException, FileAlreadyExistsException {
|
||||
boolean failIfAlreadyExists) throws URISyntaxException, StorageException, IOException {
|
||||
assert inputStream.markSupported()
|
||||
: "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken";
|
||||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {})", blobName, blobSize));
|
||||
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
|
||||
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.repositories.azure;
|
||||
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
import org.apache.logging.log4j.core.util.Throwables;
|
||||
import org.elasticsearch.SpecialPermission;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -44,7 +45,9 @@ public final class SocketAccess {
|
|||
try {
|
||||
return AccessController.doPrivileged(operation);
|
||||
} catch (PrivilegedActionException e) {
|
||||
throw (IOException) e.getCause();
|
||||
Throwables.rethrow(e.getCause());
|
||||
assert false : "always throws";
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -53,7 +56,9 @@ public final class SocketAccess {
|
|||
try {
|
||||
return AccessController.doPrivileged(operation);
|
||||
} catch (PrivilegedActionException e) {
|
||||
throw (StorageException) e.getCause();
|
||||
Throwables.rethrow(e.getCause());
|
||||
assert false : "always throws";
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,12 +70,7 @@ public final class SocketAccess {
|
|||
return null;
|
||||
});
|
||||
} catch (PrivilegedActionException e) {
|
||||
Throwable cause = e.getCause();
|
||||
if (cause instanceof StorageException) {
|
||||
throw (StorageException) cause;
|
||||
} else {
|
||||
throw (URISyntaxException) cause;
|
||||
}
|
||||
Throwables.rethrow(e.getCause());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.junit.After;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.InetAddress;
|
||||
|
@ -63,6 +64,7 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -294,6 +296,44 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
|
|||
assertThat(blocks.isEmpty(), is(true));
|
||||
}
|
||||
|
||||
public void testRetryUntilFail() throws IOException {
|
||||
final AtomicBoolean requestReceived = new AtomicBoolean(false);
|
||||
httpServer.createContext("/container/write_blob_max_retries", exchange -> {
|
||||
try {
|
||||
if (requestReceived.compareAndSet(false, true)) {
|
||||
throw new AssertionError("Should not receive two requests");
|
||||
} else {
|
||||
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
|
||||
}
|
||||
} finally {
|
||||
exchange.close();
|
||||
}
|
||||
});
|
||||
|
||||
final BlobContainer blobContainer = createBlobContainer(randomIntBetween(2, 5));
|
||||
try (InputStream stream = new InputStream() {
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
throw new IOException("foo");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean markSupported() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
throw new AssertionError("should not be called");
|
||||
}
|
||||
}) {
|
||||
final IOException ioe = expectThrows(IOException.class, () ->
|
||||
blobContainer.writeBlob("write_blob_max_retries", stream, randomIntBetween(1, 128), randomBoolean()));
|
||||
assertThat(ioe.getMessage(), is("foo"));
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] randomBlobContent() {
|
||||
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue