From 98f525f8a7c6d3e5b1db386439a68b9baf3669a8 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 15 Sep 2020 18:27:22 +0200 Subject: [PATCH] Faster Azure Blob InputStream (#61812) (#62387) Building our own that should perform better than the one in the SDK. Also, as a result saving a HEAD call for each ranged read on Azure. --- .../repositories/azure/AzureBlobStore.java | 134 +++++++++++++++--- .../azure/AzureBlobContainerRetriesTests.java | 10 +- 2 files changed, 112 insertions(+), 32 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 2fed23b26a3..39c1fb7d914 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -26,7 +26,6 @@ import com.microsoft.azure.storage.RequestCompletedEvent; import com.microsoft.azure.storage.StorageErrorCodeStrings; import com.microsoft.azure.storage.StorageEvent; import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; import com.microsoft.azure.storage.blob.CloudBlob; @@ -50,10 +49,12 @@ import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.repositories.azure.AzureRepository.Repository; import org.elasticsearch.threadpool.ThreadPool; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; @@ -248,9 +249,22 @@ public class AzureBlobStore implements BlobStore { final OperationContext context = hookMetricCollector(client.v2().get(), getMetricsCollector); final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob); logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob)); - final BlobInputStream is = SocketAccess.doPrivilegedException(() -> - blockBlobReference.openInputStream(position, length, null, null, context)); - return giveSocketPermissionsToStream(is); + final long limit; + if (length == null) { + // Loading the blob attributes so we can get its length + SocketAccess.doPrivilegedVoidException(() -> blockBlobReference.downloadAttributes(null, null, context)); + limit = blockBlobReference.getProperties().getLength() - position; + } + else { + limit = length; + } + final BlobInputStream blobInputStream = new BlobInputStream(limit, blockBlobReference, position, context); + if (length != null) { + // pre-filling the buffer in case of ranged reads so this method throws a 404 storage exception right away in case the blob + // does not exist + blobInputStream.fill(); + } + return blobInputStream; } public Map listBlobsByPrefix(String keyPath, String prefix) @@ -350,25 +364,6 @@ public class AzureBlobStore implements BlobStore { return context; } - static InputStream giveSocketPermissionsToStream(final InputStream stream) { - return new InputStream() { - @Override - public int read() throws IOException { - return SocketAccess.doPrivilegedIOException(stream::read); - } - - @Override - public int read(byte[] b) throws IOException { - return SocketAccess.doPrivilegedIOException(() -> stream.read(b)); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return SocketAccess.doPrivilegedIOException(() -> stream.read(b, off, len)); - } - }; - } - @Override public Map stats() { return stats.toMap(); @@ -397,4 +392,97 @@ public class AzureBlobStore implements BlobStore { "PutBlockList", putBlockListOperations.get()); } } + + /** + * Building our own input stream instead of using the SDK's {@link com.microsoft.azure.storage.blob.BlobInputStream} + * because that stream is highly inefficient in both memory and CPU use. + */ + private static class BlobInputStream extends InputStream { + + /** + * Maximum number of bytes to fetch per read request and thus to buffer on heap at a time. + * Set to 4M because that's what {@link com.microsoft.azure.storage.blob.BlobInputStream} uses. + */ + private static final int MAX_READ_CHUNK_SIZE = ByteSizeUnit.MB.toIntBytes(4); + + /** + * Using a {@link ByteArrayOutputStream} as a buffer instead of a byte array since the byte array APIs on the SDK are less + * efficient. + */ + private final ByteArrayOutputStream buffer; + + private final long limit; + + private final CloudBlockBlob blockBlobReference; + + private final long start; + + private final OperationContext context; + + // current read position on the byte array backing #buffer + private int pos; + + // current position up to which the contents of the blob where buffered + private long offset; + + BlobInputStream(long limit, CloudBlockBlob blockBlobReference, long start, OperationContext context) { + this.limit = limit; + this.blockBlobReference = blockBlobReference; + this.start = start; + this.context = context; + buffer = new ByteArrayOutputStream(Math.min(MAX_READ_CHUNK_SIZE, Math.toIntExact(Math.min(limit, Integer.MAX_VALUE)))) { + @Override + public byte[] toByteArray() { + return buf; + } + }; + pos = 0; + offset = 0; + } + + @Override + public int read() throws IOException { + try { + fill(); + } catch (StorageException | URISyntaxException ex) { + throw new IOException(ex); + } + if (pos == buffer.size()) { + return -1; + } + return buffer.toByteArray()[pos++]; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + try { + fill(); + } catch (StorageException | URISyntaxException ex) { + throw new IOException(ex); + } + final int buffered = buffer.size(); + int remaining = buffered - pos; + if (len > 0 && remaining == 0) { + return -1; + } + final int toRead = Math.min(remaining, len); + System.arraycopy(buffer.toByteArray(), pos, b, off, toRead); + pos += toRead; + return toRead; + } + + void fill() throws StorageException, URISyntaxException { + if (pos == buffer.size()) { + final long toFill = Math.min(limit - this.offset, MAX_READ_CHUNK_SIZE); + if (toFill <= 0L) { + return; + } + buffer.reset(); + SocketAccess.doPrivilegedVoidException(() -> blockBlobReference.downloadRange( + start + this.offset, toFill, buffer, null, null, context)); + this.offset += buffer.size(); + pos = 0; + } + } + } } diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java index 0dd5944010d..af623217c22 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerRetriesTests.java @@ -217,20 +217,13 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { public void testReadRangeBlobWithRetries() throws Exception { final int maxRetries = randomIntBetween(1, 5); - final CountDown countDownHead = new CountDown(maxRetries); final CountDown countDownGet = new CountDown(maxRetries); final byte[] bytes = randomBlobContent(); httpServer.createContext("/container/read_range_blob_max_retries", exchange -> { try { Streams.readFully(exchange.getRequestBody()); if ("HEAD".equals(exchange.getRequestMethod())) { - if (countDownHead.countDown()) { - exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length)); - exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); - return; - } + throw new AssertionError("Should not HEAD blob for ranged reads"); } else if ("GET".equals(exchange.getRequestMethod())) { if (countDownGet.countDown()) { final int rangeStart = getRangeStart(exchange); @@ -262,7 +255,6 @@ public class AzureBlobContainerRetriesTests extends ESTestCase { try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) { final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream)); assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead); - assertThat(countDownHead.isCountedDown(), is(true)); assertThat(countDownGet.isCountedDown(), is(true)); } }