From 799f7def9fc5e96f1b4f56befbc747d41a055d53 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 18 Sep 2019 09:42:16 +0200 Subject: [PATCH] Add block support to AzureBlobStoreRepositoryTests (#46664) This commit adds support for Put Block API to the internal HTTP server used in Azure repository integration tests. This allows to test the behavior of the Azure SDK client when the Azure Storage service returns errors when uploading Blob in multiple blocks or when downloading a blob using ranged downloads. --- .../azure/AzureStorageService.java | 8 +- .../azure/AzureBlobStoreRepositoryTests.java | 119 ++++++++++++++++-- 2 files changed, 115 insertions(+), 12 deletions(-) diff --git a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index c9f820ff09f..fbdac39b9f6 100644 --- a/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/plugins/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -30,6 +30,7 @@ import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.blob.BlobInputStream; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; +import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; @@ -329,7 +330,7 @@ public class AzureStorageService { final AccessCondition accessCondition = failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition(); SocketAccess.doPrivilegedVoidException(() -> - blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get())); + blob.upload(inputStream, blobSize, accessCondition, getBlobRequestOptionsForWriteBlob(), client.v2().get())); } catch (final StorageException se) { if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT && StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) { @@ -340,6 +341,11 @@ public class AzureStorageService { logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize)); } + // package private for testing + BlobRequestOptions getBlobRequestOptionsForWriteBlob() { + return null; + } + static InputStream giveSocketPermissionsToStream(final InputStream stream) { return new InputStream() { @Override diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 12d212cc1bb..6960eced0a7 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -21,27 +21,46 @@ package org.elasticsearch.repositories.azure; import com.microsoft.azure.storage.Constants; import com.microsoft.azure.storage.RetryExponentialRetry; import com.microsoft.azure.storage.RetryPolicyFactory; +import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestUtils; +import org.elasticsearch.test.BackgroundIndexer; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.equalTo; @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { @@ -90,7 +109,43 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg } /** - * AzureRepositoryPlugin that allows to set very low values for the Azure's client retry policy + * Test the snapshot and restore of an index which has large segments files. + */ + public void testSnapshotWithLargeSegmentFiles() throws Exception { + final String repository = createRepository(randomName()); + final String index = "index-no-merges"; + createIndex(index, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE) + .build()); + + // the number of documents here dictates the size of the single segment + // we want a large segment (1Mb+) so that Azure SDK client executes Put Block API calls + // the size of each uploaded block is defined by Constants.DEFAULT_STREAM_WRITE_IN_BYTES (~4Mb) + final long nbDocs = randomLongBetween(10_000L, 20_000L); + try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) { + awaitBusy(() -> indexer.totalIndexedDocs() >= nbDocs); + } + + flushAndRefresh(index); + ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get(); + assertThat(forceMerge.getSuccessfulShards(), equalTo(1)); + assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + + assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot") + .setWaitForCompletion(true).setIndices(index)); + + assertAcked(client().admin().indices().prepareDelete(index)); + + assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true)); + ensureGreen(index); + assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + } + + /** + * AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy + * and for BlobRequestOptions#getSingleBlobPutThresholdInBytes(). */ public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin { @@ -105,6 +160,13 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) { return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries()); } + + @Override + BlobRequestOptions getBlobRequestOptionsForWriteBlob() { + BlobRequestOptions options = new BlobRequestOptions(); + options.setSingleBlobPutThresholdInBytes(Math.toIntExact(ByteSizeUnit.MB.toBytes(1))); + return options; + } }; } } @@ -121,12 +183,36 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg public void handle(final HttpExchange exchange) throws IOException { final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString(); try { - if (Regex.simpleMatch("PUT /container/*", request)) { - blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody())); + if (Regex.simpleMatch("PUT /container/*blockid=*", request)) { + final Map params = new HashMap<>(); + RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); + + final String blockId = params.get("blockid"); + blobs.put(blockId, Streams.readFully(exchange.getRequestBody())); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + + } else if (Regex.simpleMatch("PUT /container/*comp=blocklist*", request)) { + final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8)); + final List blockIds = Arrays.stream(blockList.split("")) + .filter(line -> line.contains("")) + .map(line -> line.substring(0, line.indexOf(""))) + .collect(Collectors.toList()); + + final ByteArrayOutputStream blob = new ByteArrayOutputStream(); + for (String blockId : blockIds) { + BytesReference block = blobs.remove(blockId); + assert block != null; + block.writeTo(blob); + } + blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray())); + exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); + + } else if (Regex.simpleMatch("PUT /container/*", request)) { + blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody())); exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1); } else if (Regex.simpleMatch("HEAD /container/*", request)) { - BytesReference blob = blobs.get(exchange.getRequestURI().toString()); + final BytesReference blob = blobs.get(exchange.getRequestURI().getPath()); if (blob == null) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); return; @@ -136,20 +222,28 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); } else if (Regex.simpleMatch("GET /container/*", request)) { - final BytesReference blob = blobs.get(exchange.getRequestURI().toString()); + final BytesReference blob = blobs.get(exchange.getRequestURI().getPath()); if (blob == null) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); return; } + + final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER); + final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range); + assertTrue(matcher.matches()); + + final int start = Integer.parseInt(matcher.group(1)); + final int length = Integer.parseInt(matcher.group(2)) - start + 1; + exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length())); + exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length)); exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length()); - blob.writeTo(exchange.getResponseBody()); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length); + exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length); } else if (Regex.simpleMatch("DELETE /container/*", request)) { Streams.readFully(exchange.getRequestBody()); - blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().toString())); + blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath())); exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1); } else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) { @@ -200,8 +294,11 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg @Override protected String requestUniqueId(final HttpExchange exchange) { - // Azure SDK client provides a unique ID per request - return exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER); + final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER); + final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER); + return exchange.getRequestMethod() + + " " + requestId + + (range != null ? " " + range : ""); } } }