From f753fa22656e525fd9723a1412505328a35254c0 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 19 Nov 2019 09:55:36 -0500 Subject: [PATCH] HttpHandlers should return correct list of objects (#49283) This commit fixes the server side logic of "List Objects" operations of Azure and S3 fixtures. Until today, the fixtures were returning a " flat" view of stored objects and were not correctly handling the delimiter parameter. This causes some objects listing to be wrongly interpreted by the snapshot deletion logic in Elasticsearch which relies on the ability to list child containers of BlobContainer (#42653) to correctly delete stale indices. As a consequence, the blobs were not correctly deleted from the emulated storage service and stayed in heap until they got garbage collected, causing CI failures like #48978. This commit fixes the server side logic of Azure and S3 fixture when listing objects so that it now return correct common blob prefixes as expected by the snapshot deletion process. It also adds an after-test check to ensure that tests leave the repository empty (besides the root index files). Closes #48978 --- .../azure/AzureBlobStoreRepositoryTests.java | 10 ++++- ...eCloudStorageBlobStoreRepositoryTests.java | 13 ++++++- .../s3/S3BlobStoreRepositoryTests.java | 10 ++++- .../java/fixture/azure/AzureHttpHandler.java | 33 +++++++++++++++-- .../gcs/GoogleCloudStorageHttpHandler.java | 22 ++++++----- .../main/java/fixture/s3/S3HttpHandler.java | 37 ++++++++++++++++--- .../ESBlobStoreRepositoryIntegTestCase.java | 5 ++- ...ESMockAPIBasedRepositoryIntegTestCase.java | 28 ++++++++++++-- 8 files changed, 131 insertions(+), 27 deletions(-) diff --git a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java index 5bc0d2684f1..e87699ff93c 100644 --- a/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java +++ b/plugins/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobStoreRepositoryTests.java @@ -64,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/container", new AzureHttpHandler("container")); + return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container")); } @Override @@ -115,6 +115,14 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint") + private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler { + + AzureBlobStoreHttpHandler(final String container) { + super(container); + } + } + /** * HTTP handler that injects random Azure service errors * diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 9ff7192bda6..275b5c94052 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -24,6 +24,7 @@ import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageOptions; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; +import fixture.gcs.FakeOAuth2HttpHandler; import fixture.gcs.GoogleCloudStorageHttpHandler; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.SuppressForbidden; @@ -77,8 +78,8 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe @Override protected Map createHttpHandlers() { final Map handlers = new HashMap<>(2); - handlers.put("/", new GoogleCloudStorageHttpHandler("bucket")); - handlers.put("/token", new fixture.gcs.FakeOAuth2HttpHandler()); + handlers.put("/", new GoogleCloudStorageBlobStoreHttpHandler("bucket")); + handlers.put("/token", new FakeOAuth2HttpHandler()); return Collections.unmodifiableMap(handlers); } @@ -186,6 +187,14 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate a Google Cloud Storage endpoint") + private static class GoogleCloudStorageBlobStoreHttpHandler extends GoogleCloudStorageHttpHandler implements BlobStoreHttpHandler { + + GoogleCloudStorageBlobStoreHttpHandler(final String bucket) { + super(bucket); + } + } + /** * HTTP handler that injects random Google Cloud Storage service errors * diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 5fb1ec8e3eb..16dd4cdc27e 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -67,7 +67,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes @Override protected Map createHttpHandlers() { - return Collections.singletonMap("/bucket", new S3HttpHandler("bucket")); + return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket")); } @Override @@ -134,6 +134,14 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes } } + @SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint") + private static class S3BlobStoreHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler { + + S3BlobStoreHttpHandler(final String bucket) { + super(bucket); + } + } + /** * HTTP handler that injects random S3 service errors * diff --git a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java index affd1181221..57c45ab40ec 100644 --- a/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java +++ b/test/fixtures/azure-fixture/src/main/java/fixture/azure/AzureHttpHandler.java @@ -36,9 +36,11 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -153,13 +155,32 @@ public class AzureHttpHandler implements HttpHandler { list.append(""); list.append(""); final String prefix = params.get("prefix"); + final Set blobPrefixes = new HashSet<>(); + final String delimiter = params.get("delimiter"); + if (delimiter != null) { + list.append("").append(delimiter).append(""); + } list.append(""); for (Map.Entry blob : blobs.entrySet()) { - if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) { - list.append("").append(blob.getKey().replace("/" + container + "/", "")).append(""); - list.append("").append(blob.getValue().length()).append(""); - list.append("BlockBlob"); + if (prefix != null && blob.getKey().startsWith("/" + container + "/" + prefix) == false) { + continue; } + String blobPath = blob.getKey().replace("/" + container + "/", ""); + if (delimiter != null) { + int fromIndex = (prefix != null ? prefix.length() : 0); + int delimiterPosition = blobPath.indexOf(delimiter, fromIndex); + if (delimiterPosition > 0) { + blobPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter); + continue; + } + } + list.append("").append(blobPath).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append("BlockBlob"); + } + if (blobPrefixes.isEmpty() == false) { + blobPrefixes.forEach(p -> list.append("").append(p).append("")); + } list.append(""); list.append(""); @@ -177,6 +198,10 @@ public class AzureHttpHandler implements HttpHandler { } } + public Map blobs() { + return blobs; + } + public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException { final Headers headers = exchange.getResponseHeaders(); headers.add("Content-Type", "application/xml"); diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index dae02548e37..f3bd2ecfe30 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.network.InetAddresses; @@ -64,7 +65,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") public class GoogleCloudStorageHttpHandler implements HttpHandler { - private final ConcurrentMap blobs; + private final ConcurrentMap blobs; private final String bucket; public GoogleCloudStorageHttpHandler(final String bucket) { @@ -86,7 +87,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { final Set prefixes = new HashSet<>(); final List listOfBlobs = new ArrayList<>(); - for (final Map.Entry blob : blobs.entrySet()) { + for (final Map.Entry blob : blobs.entrySet()) { final String blobName = blob.getKey(); if (prefix.isEmpty() || blobName.startsWith(prefix)) { int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1; @@ -122,7 +123,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { } else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) { // Download Object https://cloud.google.com/storage/docs/request-body - BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "")); + BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "")); if (blob != null) { final String range = exchange.getRequestHeaders().getFirst("Range"); Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range); @@ -130,7 +131,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { throw new AssertionError("Range bytes header does not match expected format: " + range); } - byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0]; + byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0]; exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); @@ -141,8 +142,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { } else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) { // Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete int deletions = 0; - for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { - Map.Entry blob = iterator.next(); + for (Iterator> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) { + Map.Entry blob = iterator.next(); if (blob.getKey().equals(exchange.getRequestURI().toString())) { iterator.remove(); deletions++; @@ -209,12 +210,11 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params); final String blobName = params.get("test_blob_name"); - byte[] blob = blobs.get(blobName).array(); - if (blob == null) { + if (blobs.containsKey(blobName) == false) { exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1); return; } - + byte[] blob = BytesReference.toBytes(blobs.get(blobName)); final String range = exchange.getRequestHeaders().getFirst("Content-Range"); final Integer limit = getContentRangeLimit(range); final int start = getContentRangeStart(range); @@ -250,6 +250,10 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { } } + public Map blobs() { + return blobs; + } + private String httpServerUrl(final HttpExchange exchange) { final InetSocketAddress address = exchange.getLocalAddress(); return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); diff --git a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java index 468035b828b..f4a467d314a 100644 --- a/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java +++ b/test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java @@ -41,10 +41,12 @@ import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; @@ -158,13 +160,34 @@ public class S3HttpHandler implements HttpHandler { if (prefix != null) { list.append("").append(prefix).append(""); } + final Set commonPrefixes = new HashSet<>(); + final String delimiter = params.get("delimiter"); + if (delimiter != null) { + list.append("").append(delimiter).append(""); + } for (Map.Entry blob : blobs.entrySet()) { - if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) { - list.append(""); - list.append("").append(blob.getKey().replace("/" + bucket + "/", "")).append(""); - list.append("").append(blob.getValue().length()).append(""); - list.append(""); + if (prefix != null && blob.getKey().startsWith("/" + bucket + "/" + prefix) == false) { + continue; } + String blobPath = blob.getKey().replace("/" + bucket + "/", ""); + if (delimiter != null) { + int fromIndex = (prefix != null ? prefix.length() : 0); + int delimiterPosition = blobPath.indexOf(delimiter, fromIndex); + if (delimiterPosition > 0) { + commonPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter); + continue; + } + } + list.append(""); + list.append("").append(blobPath).append(""); + list.append("").append(blob.getValue().length()).append(""); + list.append(""); + } + if (commonPrefixes.isEmpty() == false) { + list.append(""); + commonPrefixes.forEach(commonPrefix -> list.append("").append(commonPrefix).append("")); + list.append(""); + } list.append(""); @@ -241,6 +264,10 @@ public class S3HttpHandler implements HttpHandler { } } + public Map blobs() { + return blobs; + } + private static String multipartKey(final String uploadId, int partNumber) { return uploadId + "\n" + partNumber; } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 619bdaf60ab..bde89293b3e 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -43,7 +43,6 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Set; -import java.util.concurrent.ExecutionException; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -272,9 +271,11 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index } } + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "test-snap2").get()); } - protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException { + protected void addRandomDocuments(String name, int numDocs) throws InterruptedException { IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) { indexRequestBuilders[i] = client().prepareIndex(name, name, Integer.toString(i)) diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index dc99cab8a28..82f052b5744 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mocksocket.MockHttpServer; @@ -41,13 +42,16 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; /** * Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services. @@ -55,6 +59,14 @@ import static org.hamcrest.Matchers.equalTo; @SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service") public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase { + /** + * A {@link HttpHandler} that allows to list stored blobs + */ + @SuppressForbidden(reason = "Uses a HttpServer to emulate a cloud-based storage service") + protected interface BlobStoreHttpHandler extends HttpHandler { + Map blobs(); + } + private static final byte[] BUFFER = new byte[1024]; private static HttpServer httpServer; @@ -81,7 +93,14 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR @After public void tearDownHttpServer() { if (handlers != null) { - handlers.keySet().forEach(context -> httpServer.removeContext(context)); + for(Map.Entry handler : handlers.entrySet()) { + httpServer.removeContext(handler.getKey()); + if (handler.getValue() instanceof BlobStoreHttpHandler) { + List blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream() + .filter(blob -> blob.contains("index") == false).collect(Collectors.toList()); + assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0)); + } + } } } @@ -110,14 +129,17 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR assertThat(forceMerge.getSuccessfulShards(), equalTo(1)); assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); - assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot") + final String snapshot = "snapshot"; + assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot) .setWaitForCompletion(true).setIndices(index)); assertAcked(client().admin().indices().prepareDelete(index)); - assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true)); + assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true)); ensureGreen(index); assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs); + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get()); } protected static String httpServerUrl() {