HttpHandlers should return correct list of objects (#49283)
This commit fixes the server side logic of "List Objects" operations of Azure and S3 fixtures. Until today, the fixtures were returning a " flat" view of stored objects and were not correctly handling the delimiter parameter. This causes some objects listing to be wrongly interpreted by the snapshot deletion logic in Elasticsearch which relies on the ability to list child containers of BlobContainer (#42653) to correctly delete stale indices. As a consequence, the blobs were not correctly deleted from the emulated storage service and stayed in heap until they got garbage collected, causing CI failures like #48978. This commit fixes the server side logic of Azure and S3 fixture when listing objects so that it now return correct common blob prefixes as expected by the snapshot deletion process. It also adds an after-test check to ensure that tests leave the repository empty (besides the root index files). Closes #48978
This commit is contained in:
parent
4d6e037e90
commit
f753fa2265
|
@ -64,7 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
|
||||
@Override
|
||||
protected Map<String, HttpHandler> createHttpHandlers() {
|
||||
return Collections.singletonMap("/container", new AzureHttpHandler("container"));
|
||||
return Collections.singletonMap("/container", new AzureBlobStoreHttpHandler("container"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -115,6 +115,14 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an Azure endpoint")
|
||||
private static class AzureBlobStoreHttpHandler extends AzureHttpHandler implements BlobStoreHttpHandler {
|
||||
|
||||
AzureBlobStoreHttpHandler(final String container) {
|
||||
super(container);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HTTP handler that injects random Azure service errors
|
||||
*
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.cloud.http.HttpTransportOptions;
|
|||
import com.google.cloud.storage.StorageOptions;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import fixture.gcs.FakeOAuth2HttpHandler;
|
||||
import fixture.gcs.GoogleCloudStorageHttpHandler;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
|
@ -77,8 +78,8 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
@Override
|
||||
protected Map<String, HttpHandler> createHttpHandlers() {
|
||||
final Map<String, HttpHandler> handlers = new HashMap<>(2);
|
||||
handlers.put("/", new GoogleCloudStorageHttpHandler("bucket"));
|
||||
handlers.put("/token", new fixture.gcs.FakeOAuth2HttpHandler());
|
||||
handlers.put("/", new GoogleCloudStorageBlobStoreHttpHandler("bucket"));
|
||||
handlers.put("/token", new FakeOAuth2HttpHandler());
|
||||
return Collections.unmodifiableMap(handlers);
|
||||
}
|
||||
|
||||
|
@ -186,6 +187,14 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpHandler to emulate a Google Cloud Storage endpoint")
|
||||
private static class GoogleCloudStorageBlobStoreHttpHandler extends GoogleCloudStorageHttpHandler implements BlobStoreHttpHandler {
|
||||
|
||||
GoogleCloudStorageBlobStoreHttpHandler(final String bucket) {
|
||||
super(bucket);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HTTP handler that injects random Google Cloud Storage service errors
|
||||
*
|
||||
|
|
|
@ -67,7 +67,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
|||
|
||||
@Override
|
||||
protected Map<String, HttpHandler> createHttpHandlers() {
|
||||
return Collections.singletonMap("/bucket", new S3HttpHandler("bucket"));
|
||||
return Collections.singletonMap("/bucket", new S3BlobStoreHttpHandler("bucket"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -134,6 +134,14 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpHandler to emulate an S3 endpoint")
|
||||
private static class S3BlobStoreHttpHandler extends S3HttpHandler implements BlobStoreHttpHandler {
|
||||
|
||||
S3BlobStoreHttpHandler(final String bucket) {
|
||||
super(bucket);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HTTP handler that injects random S3 service errors
|
||||
*
|
||||
|
|
|
@ -36,9 +36,11 @@ import java.io.InputStreamReader;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -153,13 +155,32 @@ public class AzureHttpHandler implements HttpHandler {
|
|||
list.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
|
||||
list.append("<EnumerationResults>");
|
||||
final String prefix = params.get("prefix");
|
||||
final Set<String> blobPrefixes = new HashSet<>();
|
||||
final String delimiter = params.get("delimiter");
|
||||
if (delimiter != null) {
|
||||
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
|
||||
}
|
||||
list.append("<Blobs>");
|
||||
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
|
||||
if (prefix == null || blob.getKey().startsWith("/" + container + "/" + prefix)) {
|
||||
list.append("<Blob><Name>").append(blob.getKey().replace("/" + container + "/", "")).append("</Name>");
|
||||
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
|
||||
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
|
||||
if (prefix != null && blob.getKey().startsWith("/" + container + "/" + prefix) == false) {
|
||||
continue;
|
||||
}
|
||||
String blobPath = blob.getKey().replace("/" + container + "/", "");
|
||||
if (delimiter != null) {
|
||||
int fromIndex = (prefix != null ? prefix.length() : 0);
|
||||
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
|
||||
if (delimiterPosition > 0) {
|
||||
blobPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
list.append("<Blob><Name>").append(blobPath).append("</Name>");
|
||||
list.append("<Properties><Content-Length>").append(blob.getValue().length()).append("</Content-Length>");
|
||||
list.append("<BlobType>BlockBlob</BlobType></Properties></Blob>");
|
||||
}
|
||||
if (blobPrefixes.isEmpty() == false) {
|
||||
blobPrefixes.forEach(p -> list.append("<BlobPrefix><Name>").append(p).append("</Name></BlobPrefix>"));
|
||||
|
||||
}
|
||||
list.append("</Blobs>");
|
||||
list.append("</EnumerationResults>");
|
||||
|
@ -177,6 +198,10 @@ public class AzureHttpHandler implements HttpHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public Map<String, BytesReference> blobs() {
|
||||
return blobs;
|
||||
}
|
||||
|
||||
public static void sendError(final HttpExchange exchange, final RestStatus status) throws IOException {
|
||||
final Headers headers = exchange.getResponseHeaders();
|
||||
headers.add("Content-Type", "application/xml");
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
|
@ -64,7 +65,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
|
|||
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
||||
public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
||||
|
||||
private final ConcurrentMap<String, BytesArray> blobs;
|
||||
private final ConcurrentMap<String, BytesReference> blobs;
|
||||
private final String bucket;
|
||||
|
||||
public GoogleCloudStorageHttpHandler(final String bucket) {
|
||||
|
@ -86,7 +87,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
final Set<String> prefixes = new HashSet<>();
|
||||
final List<String> listOfBlobs = new ArrayList<>();
|
||||
|
||||
for (final Map.Entry<String, BytesArray> blob : blobs.entrySet()) {
|
||||
for (final Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
|
||||
final String blobName = blob.getKey();
|
||||
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
|
||||
int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
|
||||
|
@ -122,7 +123,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
|
||||
} else if (Regex.simpleMatch("GET /download/storage/v1/b/" + bucket + "/o/*", request)) {
|
||||
// Download Object https://cloud.google.com/storage/docs/request-body
|
||||
BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
|
||||
BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
|
||||
if (blob != null) {
|
||||
final String range = exchange.getRequestHeaders().getFirst("Range");
|
||||
Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
|
||||
|
@ -130,7 +131,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
throw new AssertionError("Range bytes header does not match expected format: " + range);
|
||||
}
|
||||
|
||||
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
|
||||
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? BytesReference.toBytes(blob) : new byte[0];
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
|
@ -141,8 +142,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
} else if (Regex.simpleMatch("DELETE /storage/v1/b/" + bucket + "/o/*", request)) {
|
||||
// Delete Object https://cloud.google.com/storage/docs/json_api/v1/objects/delete
|
||||
int deletions = 0;
|
||||
for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
|
||||
Map.Entry<String, BytesArray> blob = iterator.next();
|
||||
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
|
||||
Map.Entry<String, BytesReference> blob = iterator.next();
|
||||
if (blob.getKey().equals(exchange.getRequestURI().toString())) {
|
||||
iterator.remove();
|
||||
deletions++;
|
||||
|
@ -209,12 +210,11 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
||||
|
||||
final String blobName = params.get("test_blob_name");
|
||||
byte[] blob = blobs.get(blobName).array();
|
||||
if (blob == null) {
|
||||
if (blobs.containsKey(blobName) == false) {
|
||||
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
|
||||
return;
|
||||
}
|
||||
|
||||
byte[] blob = BytesReference.toBytes(blobs.get(blobName));
|
||||
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
|
||||
final Integer limit = getContentRangeLimit(range);
|
||||
final int start = getContentRangeStart(range);
|
||||
|
@ -250,6 +250,10 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public Map<String, BytesReference> blobs() {
|
||||
return blobs;
|
||||
}
|
||||
|
||||
private String httpServerUrl(final HttpExchange exchange) {
|
||||
final InetSocketAddress address = exchange.getLocalAddress();
|
||||
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();
|
||||
|
|
|
@ -41,10 +41,12 @@ import java.io.InputStreamReader;
|
|||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.regex.Matcher;
|
||||
|
@ -158,13 +160,34 @@ public class S3HttpHandler implements HttpHandler {
|
|||
if (prefix != null) {
|
||||
list.append("<Prefix>").append(prefix).append("</Prefix>");
|
||||
}
|
||||
final Set<String> commonPrefixes = new HashSet<>();
|
||||
final String delimiter = params.get("delimiter");
|
||||
if (delimiter != null) {
|
||||
list.append("<Delimiter>").append(delimiter).append("</Delimiter>");
|
||||
}
|
||||
for (Map.Entry<String, BytesReference> blob : blobs.entrySet()) {
|
||||
if (prefix == null || blob.getKey().startsWith("/" + bucket + "/" + prefix)) {
|
||||
list.append("<Contents>");
|
||||
list.append("<Key>").append(blob.getKey().replace("/" + bucket + "/", "")).append("</Key>");
|
||||
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
|
||||
list.append("</Contents>");
|
||||
if (prefix != null && blob.getKey().startsWith("/" + bucket + "/" + prefix) == false) {
|
||||
continue;
|
||||
}
|
||||
String blobPath = blob.getKey().replace("/" + bucket + "/", "");
|
||||
if (delimiter != null) {
|
||||
int fromIndex = (prefix != null ? prefix.length() : 0);
|
||||
int delimiterPosition = blobPath.indexOf(delimiter, fromIndex);
|
||||
if (delimiterPosition > 0) {
|
||||
commonPrefixes.add(blobPath.substring(0, delimiterPosition) + delimiter);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
list.append("<Contents>");
|
||||
list.append("<Key>").append(blobPath).append("</Key>");
|
||||
list.append("<Size>").append(blob.getValue().length()).append("</Size>");
|
||||
list.append("</Contents>");
|
||||
}
|
||||
if (commonPrefixes.isEmpty() == false) {
|
||||
list.append("<CommonPrefixes>");
|
||||
commonPrefixes.forEach(commonPrefix -> list.append("<Prefix>").append(commonPrefix).append("</Prefix>"));
|
||||
list.append("</CommonPrefixes>");
|
||||
|
||||
}
|
||||
list.append("</ListBucketResult>");
|
||||
|
||||
|
@ -241,6 +264,10 @@ public class S3HttpHandler implements HttpHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public Map<String, BytesReference> blobs() {
|
||||
return blobs;
|
||||
}
|
||||
|
||||
private static String multipartKey(final String uploadId, int partNumber) {
|
||||
return uploadId + "\n" + partNumber;
|
||||
}
|
||||
|
|
|
@ -43,7 +43,6 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
|
@ -272,9 +271,11 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
|||
assertFalse(BlobStoreTestUtil.blobExists(indicesBlobContainer.get(), indexId.getId())); // deleted index
|
||||
}
|
||||
}
|
||||
|
||||
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, "test-snap2").get());
|
||||
}
|
||||
|
||||
protected void addRandomDocuments(String name, int numDocs) throws ExecutionException, InterruptedException {
|
||||
protected void addRandomDocuments(String name, int numDocs) throws InterruptedException {
|
||||
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
indexRequestBuilders[i] = client().prepareIndex(name, name, Integer.toString(i))
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.network.InetAddresses;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
|
@ -41,13 +42,16 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
/**
|
||||
* Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services.
|
||||
|
@ -55,6 +59,14 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a cloud-based storage service")
|
||||
public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreRepositoryIntegTestCase {
|
||||
|
||||
/**
|
||||
* A {@link HttpHandler} that allows to list stored blobs
|
||||
*/
|
||||
@SuppressForbidden(reason = "Uses a HttpServer to emulate a cloud-based storage service")
|
||||
protected interface BlobStoreHttpHandler extends HttpHandler {
|
||||
Map<String, BytesReference> blobs();
|
||||
}
|
||||
|
||||
private static final byte[] BUFFER = new byte[1024];
|
||||
|
||||
private static HttpServer httpServer;
|
||||
|
@ -81,7 +93,14 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
|
|||
@After
|
||||
public void tearDownHttpServer() {
|
||||
if (handlers != null) {
|
||||
handlers.keySet().forEach(context -> httpServer.removeContext(context));
|
||||
for(Map.Entry<String, HttpHandler> handler : handlers.entrySet()) {
|
||||
httpServer.removeContext(handler.getKey());
|
||||
if (handler.getValue() instanceof BlobStoreHttpHandler) {
|
||||
List<String> blobs = ((BlobStoreHttpHandler) handler.getValue()).blobs().keySet().stream()
|
||||
.filter(blob -> blob.contains("index") == false).collect(Collectors.toList());
|
||||
assertThat("Only index blobs should remain in repository but found " + blobs, blobs, hasSize(0));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -110,14 +129,17 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
|
|||
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
|
||||
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
|
||||
|
||||
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
|
||||
final String snapshot = "snapshot";
|
||||
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, snapshot)
|
||||
.setWaitForCompletion(true).setIndices(index));
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete(index));
|
||||
|
||||
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
|
||||
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
|
||||
ensureGreen(index);
|
||||
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
|
||||
|
||||
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repository, snapshot).get());
|
||||
}
|
||||
|
||||
protected static String httpServerUrl() {
|
||||
|
|
Loading…
Reference in New Issue