Faster Azure Blob InputStream (#61812) (#62387)

Building our own that should perform better than the one in the SDK.
Also, as a result saving a HEAD call for each ranged read on Azure.
This commit is contained in:
Armin Braun 2020-09-15 18:27:22 +02:00 committed by GitHub
parent 5dc0de04fb
commit 98f525f8a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 112 additions and 32 deletions

View File

@ -26,7 +26,6 @@ import com.microsoft.azure.storage.RequestCompletedEvent;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageEvent;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
import com.microsoft.azure.storage.blob.CloudBlob;
@ -50,10 +49,12 @@ import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.PlainBlobMetadata;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
@ -248,9 +249,22 @@ public class AzureBlobStore implements BlobStore {
final OperationContext context = hookMetricCollector(client.v2().get(), getMetricsCollector);
final CloudBlockBlob blockBlobReference = client.v1().getContainerReference(container).getBlockBlobReference(blob);
logger.trace(() -> new ParameterizedMessage("reading container [{}], blob [{}]", container, blob));
final BlobInputStream is = SocketAccess.doPrivilegedException(() ->
blockBlobReference.openInputStream(position, length, null, null, context));
return giveSocketPermissionsToStream(is);
final long limit;
if (length == null) {
// Loading the blob attributes so we can get its length
SocketAccess.doPrivilegedVoidException(() -> blockBlobReference.downloadAttributes(null, null, context));
limit = blockBlobReference.getProperties().getLength() - position;
}
else {
limit = length;
}
final BlobInputStream blobInputStream = new BlobInputStream(limit, blockBlobReference, position, context);
if (length != null) {
// pre-filling the buffer in case of ranged reads so this method throws a 404 storage exception right away in case the blob
// does not exist
blobInputStream.fill();
}
return blobInputStream;
}
public Map<String, BlobMetadata> listBlobsByPrefix(String keyPath, String prefix)
@ -350,25 +364,6 @@ public class AzureBlobStore implements BlobStore {
return context;
}
static InputStream giveSocketPermissionsToStream(final InputStream stream) {
return new InputStream() {
@Override
public int read() throws IOException {
return SocketAccess.doPrivilegedIOException(stream::read);
}
@Override
public int read(byte[] b) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> stream.read(b));
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return SocketAccess.doPrivilegedIOException(() -> stream.read(b, off, len));
}
};
}
@Override
public Map<String, Long> stats() {
return stats.toMap();
@ -397,4 +392,97 @@ public class AzureBlobStore implements BlobStore {
"PutBlockList", putBlockListOperations.get());
}
}
/**
* Building our own input stream instead of using the SDK's {@link com.microsoft.azure.storage.blob.BlobInputStream}
* because that stream is highly inefficient in both memory and CPU use.
*/
private static class BlobInputStream extends InputStream {
/**
* Maximum number of bytes to fetch per read request and thus to buffer on heap at a time.
* Set to 4M because that's what {@link com.microsoft.azure.storage.blob.BlobInputStream} uses.
*/
private static final int MAX_READ_CHUNK_SIZE = ByteSizeUnit.MB.toIntBytes(4);
/**
* Using a {@link ByteArrayOutputStream} as a buffer instead of a byte array since the byte array APIs on the SDK are less
* efficient.
*/
private final ByteArrayOutputStream buffer;
private final long limit;
private final CloudBlockBlob blockBlobReference;
private final long start;
private final OperationContext context;
// current read position on the byte array backing #buffer
private int pos;
// current position up to which the contents of the blob where buffered
private long offset;
BlobInputStream(long limit, CloudBlockBlob blockBlobReference, long start, OperationContext context) {
this.limit = limit;
this.blockBlobReference = blockBlobReference;
this.start = start;
this.context = context;
buffer = new ByteArrayOutputStream(Math.min(MAX_READ_CHUNK_SIZE, Math.toIntExact(Math.min(limit, Integer.MAX_VALUE)))) {
@Override
public byte[] toByteArray() {
return buf;
}
};
pos = 0;
offset = 0;
}
@Override
public int read() throws IOException {
try {
fill();
} catch (StorageException | URISyntaxException ex) {
throw new IOException(ex);
}
if (pos == buffer.size()) {
return -1;
}
return buffer.toByteArray()[pos++];
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
fill();
} catch (StorageException | URISyntaxException ex) {
throw new IOException(ex);
}
final int buffered = buffer.size();
int remaining = buffered - pos;
if (len > 0 && remaining == 0) {
return -1;
}
final int toRead = Math.min(remaining, len);
System.arraycopy(buffer.toByteArray(), pos, b, off, toRead);
pos += toRead;
return toRead;
}
void fill() throws StorageException, URISyntaxException {
if (pos == buffer.size()) {
final long toFill = Math.min(limit - this.offset, MAX_READ_CHUNK_SIZE);
if (toFill <= 0L) {
return;
}
buffer.reset();
SocketAccess.doPrivilegedVoidException(() -> blockBlobReference.downloadRange(
start + this.offset, toFill, buffer, null, null, context));
this.offset += buffer.size();
pos = 0;
}
}
}
}

View File

@ -217,20 +217,13 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
public void testReadRangeBlobWithRetries() throws Exception {
final int maxRetries = randomIntBetween(1, 5);
final CountDown countDownHead = new CountDown(maxRetries);
final CountDown countDownGet = new CountDown(maxRetries);
final byte[] bytes = randomBlobContent();
httpServer.createContext("/container/read_range_blob_max_retries", exchange -> {
try {
Streams.readFully(exchange.getRequestBody());
if ("HEAD".equals(exchange.getRequestMethod())) {
if (countDownHead.countDown()) {
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(bytes.length));
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
return;
}
throw new AssertionError("Should not HEAD blob for ranged reads");
} else if ("GET".equals(exchange.getRequestMethod())) {
if (countDownGet.countDown()) {
final int rangeStart = getRangeStart(exchange);
@ -262,7 +255,6 @@ public class AzureBlobContainerRetriesTests extends ESTestCase {
try (InputStream inputStream = blobContainer.readBlob("read_range_blob_max_retries", position, length)) {
final byte[] bytesRead = BytesReference.toBytes(Streams.readFully(inputStream));
assertArrayEquals(Arrays.copyOfRange(bytes, position, Math.min(bytes.length, position + length)), bytesRead);
assertThat(countDownHead.isCountedDown(), is(true));
assertThat(countDownGet.isCountedDown(), is(true));
}
}