Add block support to AzureBlobStoreRepositoryTests (#46664)
This commit adds support for Put Block API to the internal HTTP server used in Azure repository integration tests. This allows to test the behavior of the Azure SDK client when the Azure Storage service returns errors when uploading Blob in multiple blocks or when downloading a blob using ranged downloads.
This commit is contained in:
parent
fd42358a6d
commit
799f7def9f
|
@ -30,6 +30,7 @@ import com.microsoft.azure.storage.StorageException;
|
|||
import com.microsoft.azure.storage.blob.BlobInputStream;
|
||||
import com.microsoft.azure.storage.blob.BlobListingDetails;
|
||||
import com.microsoft.azure.storage.blob.BlobProperties;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.microsoft.azure.storage.blob.CloudBlob;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobClient;
|
||||
import com.microsoft.azure.storage.blob.CloudBlobContainer;
|
||||
|
@ -329,7 +330,7 @@ public class AzureStorageService {
|
|||
final AccessCondition accessCondition =
|
||||
failIfAlreadyExists ? AccessCondition.generateIfNotExistsCondition() : AccessCondition.generateEmptyCondition();
|
||||
SocketAccess.doPrivilegedVoidException(() ->
|
||||
blob.upload(inputStream, blobSize, accessCondition, null, client.v2().get()));
|
||||
blob.upload(inputStream, blobSize, accessCondition, getBlobRequestOptionsForWriteBlob(), client.v2().get()));
|
||||
} catch (final StorageException se) {
|
||||
if (failIfAlreadyExists && se.getHttpStatusCode() == HttpURLConnection.HTTP_CONFLICT &&
|
||||
StorageErrorCodeStrings.BLOB_ALREADY_EXISTS.equals(se.getErrorCode())) {
|
||||
|
@ -340,6 +341,11 @@ public class AzureStorageService {
|
|||
logger.trace(() -> new ParameterizedMessage("writeBlob({}, stream, {}) - done", blobName, blobSize));
|
||||
}
|
||||
|
||||
// package private for testing
|
||||
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
|
||||
return null;
|
||||
}
|
||||
|
||||
static InputStream giveSocketPermissionsToStream(final InputStream stream) {
|
||||
return new InputStream() {
|
||||
@Override
|
||||
|
|
|
@ -21,27 +21,46 @@ package org.elasticsearch.repositories.azure;
|
|||
import com.microsoft.azure.storage.Constants;
|
||||
import com.microsoft.azure.storage.RetryExponentialRetry;
|
||||
import com.microsoft.azure.storage.RetryPolicyFactory;
|
||||
import com.microsoft.azure.storage.blob.BlobRequestOptions;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.RestUtils;
|
||||
import org.elasticsearch.test.BackgroundIndexer;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
|
||||
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
|
||||
|
@ -90,7 +109,43 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
}
|
||||
|
||||
/**
|
||||
* AzureRepositoryPlugin that allows to set very low values for the Azure's client retry policy
|
||||
* Test the snapshot and restore of an index which has large segments files.
|
||||
*/
|
||||
public void testSnapshotWithLargeSegmentFiles() throws Exception {
|
||||
final String repository = createRepository(randomName());
|
||||
final String index = "index-no-merges";
|
||||
createIndex(index, Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE)
|
||||
.build());
|
||||
|
||||
// the number of documents here dictates the size of the single segment
|
||||
// we want a large segment (1Mb+) so that Azure SDK client executes Put Block API calls
|
||||
// the size of each uploaded block is defined by Constants.DEFAULT_STREAM_WRITE_IN_BYTES (~4Mb)
|
||||
final long nbDocs = randomLongBetween(10_000L, 20_000L);
|
||||
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
|
||||
awaitBusy(() -> indexer.totalIndexedDocs() >= nbDocs);
|
||||
}
|
||||
|
||||
flushAndRefresh(index);
|
||||
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
|
||||
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
|
||||
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
|
||||
|
||||
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
|
||||
.setWaitForCompletion(true).setIndices(index));
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete(index));
|
||||
|
||||
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
|
||||
ensureGreen(index);
|
||||
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
|
||||
}
|
||||
|
||||
/**
|
||||
* AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy
|
||||
* and for BlobRequestOptions#getSingleBlobPutThresholdInBytes().
|
||||
*/
|
||||
public static class TestAzureRepositoryPlugin extends AzureRepositoryPlugin {
|
||||
|
||||
|
@ -105,6 +160,13 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
RetryPolicyFactory createRetryPolicy(final AzureStorageSettings azureStorageSettings) {
|
||||
return new RetryExponentialRetry(1, 100, 500, azureStorageSettings.getMaxRetries());
|
||||
}
|
||||
|
||||
@Override
|
||||
BlobRequestOptions getBlobRequestOptionsForWriteBlob() {
|
||||
BlobRequestOptions options = new BlobRequestOptions();
|
||||
options.setSingleBlobPutThresholdInBytes(Math.toIntExact(ByteSizeUnit.MB.toBytes(1)));
|
||||
return options;
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -121,12 +183,36 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
public void handle(final HttpExchange exchange) throws IOException {
|
||||
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
|
||||
try {
|
||||
if (Regex.simpleMatch("PUT /container/*", request)) {
|
||||
blobs.put(exchange.getRequestURI().toString(), Streams.readFully(exchange.getRequestBody()));
|
||||
if (Regex.simpleMatch("PUT /container/*blockid=*", request)) {
|
||||
final Map<String, String> params = new HashMap<>();
|
||||
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
||||
|
||||
final String blockId = params.get("blockid");
|
||||
blobs.put(blockId, Streams.readFully(exchange.getRequestBody()));
|
||||
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
|
||||
|
||||
} else if (Regex.simpleMatch("PUT /container/*comp=blocklist*", request)) {
|
||||
final String blockList = Streams.copyToString(new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8));
|
||||
final List<String> blockIds = Arrays.stream(blockList.split("<Latest>"))
|
||||
.filter(line -> line.contains("</Latest>"))
|
||||
.map(line -> line.substring(0, line.indexOf("</Latest>")))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final ByteArrayOutputStream blob = new ByteArrayOutputStream();
|
||||
for (String blockId : blockIds) {
|
||||
BytesReference block = blobs.remove(blockId);
|
||||
assert block != null;
|
||||
block.writeTo(blob);
|
||||
}
|
||||
blobs.put(exchange.getRequestURI().getPath(), new BytesArray(blob.toByteArray()));
|
||||
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
|
||||
|
||||
} else if (Regex.simpleMatch("PUT /container/*", request)) {
|
||||
blobs.put(exchange.getRequestURI().getPath(), Streams.readFully(exchange.getRequestBody()));
|
||||
exchange.sendResponseHeaders(RestStatus.CREATED.getStatus(), -1);
|
||||
|
||||
} else if (Regex.simpleMatch("HEAD /container/*", request)) {
|
||||
BytesReference blob = blobs.get(exchange.getRequestURI().toString());
|
||||
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
|
||||
if (blob == null) {
|
||||
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
|
||||
return;
|
||||
|
@ -136,20 +222,28 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
|
||||
|
||||
} else if (Regex.simpleMatch("GET /container/*", request)) {
|
||||
final BytesReference blob = blobs.get(exchange.getRequestURI().toString());
|
||||
final BytesReference blob = blobs.get(exchange.getRequestURI().getPath());
|
||||
if (blob == null) {
|
||||
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
|
||||
return;
|
||||
}
|
||||
|
||||
final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER);
|
||||
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$").matcher(range);
|
||||
assertTrue(matcher.matches());
|
||||
|
||||
final int start = Integer.parseInt(matcher.group(1));
|
||||
final int length = Integer.parseInt(matcher.group(2)) - start + 1;
|
||||
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
|
||||
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(blob.length()));
|
||||
exchange.getResponseHeaders().add("x-ms-blob-content-length", String.valueOf(length));
|
||||
exchange.getResponseHeaders().add("x-ms-blob-type", "blockblob");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());
|
||||
blob.writeTo(exchange.getResponseBody());
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), length);
|
||||
exchange.getResponseBody().write(blob.toBytesRef().bytes, start, length);
|
||||
|
||||
} else if (Regex.simpleMatch("DELETE /container/*", request)) {
|
||||
Streams.readFully(exchange.getRequestBody());
|
||||
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().toString()));
|
||||
blobs.entrySet().removeIf(blob -> blob.getKey().startsWith(exchange.getRequestURI().getPath()));
|
||||
exchange.sendResponseHeaders(RestStatus.ACCEPTED.getStatus(), -1);
|
||||
|
||||
} else if (Regex.simpleMatch("GET /container?restype=container&comp=list*", request)) {
|
||||
|
@ -200,8 +294,11 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
|
|||
|
||||
@Override
|
||||
protected String requestUniqueId(final HttpExchange exchange) {
|
||||
// Azure SDK client provides a unique ID per request
|
||||
return exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
|
||||
final String requestId = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.CLIENT_REQUEST_ID_HEADER);
|
||||
final String range = exchange.getRequestHeaders().getFirst(Constants.HeaderConstants.STORAGE_RANGE_HEADER);
|
||||
return exchange.getRequestMethod()
|
||||
+ " " + requestId
|
||||
+ (range != null ? " " + range : "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue