Add resumable uploads support to GCS repository integration tests (#46562)
This commit adds support for resumable uploads to the internal HTTP server used in GoogleCloudStorageBlobStoreRepositoryTests. This way we can also test the behavior of the Google's client when the service returns server errors in response to resumable upload requests. The BlobStore implementation for GCS has the choice between 2 methods to upload a blob: resumable and multipart. In the current implementation, the client executes a resumable upload if the blob size is larger than LARGE_BLOB_THRESHOLD_BYTE_SIZE, otherwise it executes a multipart upload. This commit makes this logic overridable in tests, allowing to randomize the decision of using one method or the other. The commit add support for single request resumable uploads and chunked resumable uploads (the blob is uploaded into multiple 2Mb chunks; each chunk being a resumable upload). For this last case, this PR also adds a test testSnapshotWithLargeSegmentFiles which makes it more probable that a chunked resumable upload is executed.
This commit is contained in:
parent
d931665f91
commit
4db37801d0
|
@ -215,13 +215,18 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
*/
|
||||
void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
|
||||
final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
|
||||
if (blobSize > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
|
||||
if (blobSize > getLargeBlobThresholdInBytes()) {
|
||||
writeBlobResumable(blobInfo, inputStream, failIfAlreadyExists);
|
||||
} else {
|
||||
writeBlobMultipart(blobInfo, inputStream, blobSize, failIfAlreadyExists);
|
||||
}
|
||||
}
|
||||
|
||||
// non-static, package private for testing
|
||||
long getLargeBlobThresholdInBytes() {
|
||||
return LARGE_BLOB_THRESHOLD_BYTE_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Uploads a blob using the "resumable upload" method (multiple requests, which
|
||||
* can be independently retried in case of failure, see
|
||||
|
@ -292,7 +297,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
*/
|
||||
private void writeBlobMultipart(BlobInfo blobInfo, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
|
||||
throws IOException {
|
||||
assert blobSize <= LARGE_BLOB_THRESHOLD_BYTE_SIZE : "large blob uploads should use the resumable upload method";
|
||||
assert blobSize <= getLargeBlobThresholdInBytes() : "large blob uploads should use the resumable upload method";
|
||||
final ByteArrayOutputStream baos = new ByteArrayOutputStream(Math.toIntExact(blobSize));
|
||||
Streams.copy(inputStream, baos);
|
||||
try {
|
||||
|
|
|
@ -20,28 +20,37 @@
|
|||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import com.google.api.gax.retrying.RetrySettings;
|
||||
import com.google.cloud.BaseWriteChannel;
|
||||
import com.google.cloud.http.HttpTransportOptions;
|
||||
import com.google.cloud.storage.StorageOptions;
|
||||
import com.sun.net.httpserver.HttpExchange;
|
||||
import com.sun.net.httpserver.HttpHandler;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.lucene.util.ArrayUtil;
|
||||
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.repositories.Repository;
|
||||
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.RestUtils;
|
||||
import org.elasticsearch.test.BackgroundIndexer;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.threeten.bp.Duration;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
|
@ -73,6 +82,9 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSetting
|
|||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
||||
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
|
||||
|
@ -167,8 +179,43 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
}
|
||||
|
||||
/**
|
||||
* GoogleCloudStoragePlugin that allows to set low values for the client retry policy
|
||||
* Test the snapshot and restore of an index which has large segments files (2Mb+).
|
||||
*
|
||||
* The value of 2Mb is chosen according to the default chunk size configured in Google SDK client
|
||||
* (see {@link BaseWriteChannel} chunk size).
|
||||
*/
|
||||
public void testSnapshotWithLargeSegmentFiles() throws Exception {
|
||||
final String repository = createRepository("repository", Settings.builder()
|
||||
.put(BUCKET.getKey(), "bucket")
|
||||
.put(CLIENT_NAME.getKey(), "test")
|
||||
.build());
|
||||
|
||||
final String index = "index-no-merges";
|
||||
createIndex(index, Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.build());
|
||||
|
||||
final int nbDocs = 10_000;
|
||||
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), nbDocs)) {
|
||||
waitForDocs(nbDocs, indexer);
|
||||
}
|
||||
|
||||
flushAndRefresh(index);
|
||||
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();
|
||||
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
|
||||
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
|
||||
|
||||
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
|
||||
.setWaitForCompletion(true).setIndices(index));
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete(index));
|
||||
|
||||
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
|
||||
ensureGreen(index);
|
||||
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
|
||||
}
|
||||
|
||||
public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
|
||||
|
||||
public TestGoogleCloudStoragePlugin(Settings settings) {
|
||||
|
@ -181,19 +228,39 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
@Override
|
||||
StorageOptions createStorageOptions(final GoogleCloudStorageClientSettings clientSettings,
|
||||
final HttpTransportOptions httpTransportOptions) {
|
||||
return super.createStorageOptions(clientSettings, httpTransportOptions)
|
||||
.toBuilder()
|
||||
StorageOptions options = super.createStorageOptions(clientSettings, httpTransportOptions);
|
||||
return options.toBuilder()
|
||||
.setRetrySettings(RetrySettings.newBuilder()
|
||||
.setMaxAttempts(10)
|
||||
.setTotalTimeout(options.getRetrySettings().getTotalTimeout())
|
||||
.setInitialRetryDelay(Duration.ofMillis(10L))
|
||||
.setRetryDelayMultiplier(2.0d)
|
||||
.setRetryDelayMultiplier(options.getRetrySettings().getRetryDelayMultiplier())
|
||||
.setMaxRetryDelay(Duration.ofSeconds(1L))
|
||||
.setTotalTimeout(Duration.ofSeconds(30L))
|
||||
.setMaxAttempts(0)
|
||||
.setJittered(false)
|
||||
.setInitialRpcTimeout(options.getRetrySettings().getInitialRpcTimeout())
|
||||
.setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier())
|
||||
.setMaxRpcTimeout(options.getRetrySettings().getMaxRpcTimeout())
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry registry, ThreadPool threadPool) {
|
||||
return Collections.singletonMap(GoogleCloudStorageRepository.TYPE,
|
||||
metadata -> new GoogleCloudStorageRepository(metadata, registry, this.storageService, threadPool) {
|
||||
@Override
|
||||
protected GoogleCloudStorageBlobStore createBlobStore() {
|
||||
return new GoogleCloudStorageBlobStore("bucket", "test", storageService) {
|
||||
@Override
|
||||
long getLargeBlobThresholdInBytes() {
|
||||
return ByteSizeUnit.MB.toBytes(1);
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private static byte[] createServiceAccount() {
|
||||
|
@ -223,13 +290,11 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
|
||||
/**
|
||||
* Minimal HTTP handler that acts as a Google Cloud Storage compliant server
|
||||
*
|
||||
* Note: it does not support resumable uploads
|
||||
*/
|
||||
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
||||
private static class InternalHttpHandler implements HttpHandler {
|
||||
|
||||
private final ConcurrentMap<String, BytesReference> blobs = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<String, BytesArray> blobs = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void handle(final HttpExchange exchange) throws IOException {
|
||||
|
@ -240,13 +305,13 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
||||
final String prefix = params.get("prefix");
|
||||
|
||||
final List<Map.Entry<String, BytesReference>> listOfBlobs = blobs.entrySet().stream()
|
||||
final List<Map.Entry<String, BytesArray>> listOfBlobs = blobs.entrySet().stream()
|
||||
.filter(blob -> prefix == null || blob.getKey().startsWith(prefix)).collect(Collectors.toList());
|
||||
|
||||
final StringBuilder list = new StringBuilder();
|
||||
list.append("{\"kind\":\"storage#objects\",\"items\":[");
|
||||
for (Iterator<Map.Entry<String, BytesReference>> it = listOfBlobs.iterator(); it.hasNext(); ) {
|
||||
Map.Entry<String, BytesReference> blob = it.next();
|
||||
for (Iterator<Map.Entry<String, BytesArray>> it = listOfBlobs.iterator(); it.hasNext(); ) {
|
||||
Map.Entry<String, BytesArray> blob = it.next();
|
||||
list.append("{\"kind\":\"storage#object\",");
|
||||
list.append("\"bucket\":\"bucket\",");
|
||||
list.append("\"name\":\"").append(blob.getKey()).append("\",");
|
||||
|
@ -272,19 +337,24 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
exchange.getResponseBody().write(response);
|
||||
|
||||
} else if (Regex.simpleMatch("GET /download/storage/v1/b/bucket/o/*", request)) {
|
||||
BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/bucket/o/", ""));
|
||||
BytesArray blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/bucket/o/", ""));
|
||||
if (blob != null) {
|
||||
final String range = exchange.getRequestHeaders().getFirst("Range");
|
||||
Matcher matcher = Pattern.compile("bytes=([0-9]*)-([0-9]*)").matcher(range);
|
||||
assert matcher.find();
|
||||
|
||||
byte[] response = Integer.parseInt(matcher.group(1)) == 0 ? blob.array() : new byte[0];
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());
|
||||
exchange.getResponseBody().write(blob.toBytesRef().bytes);
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
} else {
|
||||
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
|
||||
}
|
||||
|
||||
} else if (Regex.simpleMatch("DELETE /storage/v1/b/bucket/o/*", request)) {
|
||||
int deletions = 0;
|
||||
for (Iterator<Map.Entry<String, BytesReference>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
|
||||
Map.Entry<String, BytesReference> blob = iterator.next();
|
||||
for (Iterator<Map.Entry<String, BytesArray>> iterator = blobs.entrySet().iterator(); iterator.hasNext(); ) {
|
||||
Map.Entry<String, BytesArray> blob = iterator.next();
|
||||
if (blob.getKey().equals(exchange.getRequestURI().toString())) {
|
||||
iterator.remove();
|
||||
deletions++;
|
||||
|
@ -320,8 +390,8 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
exchange.getResponseBody().write(response);
|
||||
|
||||
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=multipart*", request)) {
|
||||
byte[] response = new byte[0];
|
||||
try (BufferedInputStream in = new BufferedInputStream(new GZIPInputStream(exchange.getRequestBody()))) {
|
||||
byte[] response = new byte[0];
|
||||
String blob = null;
|
||||
int read;
|
||||
while ((read = in.read()) != -1) {
|
||||
|
@ -369,16 +439,74 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
byte[] tmp = binary.toByteArray();
|
||||
// removes the trailing end "\r\n--__END_OF_PART__--\r\n" which is 23 bytes long
|
||||
blobs.put(blob, new BytesArray(Arrays.copyOf(tmp, tmp.length - 23)));
|
||||
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/json");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
|
||||
} finally {
|
||||
blob = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else if (Regex.simpleMatch("POST /upload/storage/v1/b/bucket/*uploadType=resumable*", request)) {
|
||||
final Map<String, String> params = new HashMap<>();
|
||||
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
||||
final String blobName = params.get("name");
|
||||
blobs.put(blobName, BytesArray.EMPTY);
|
||||
|
||||
byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/json");
|
||||
exchange.getResponseHeaders().add("Location", httpServerUrl() + "/upload/storage/v1/b/bucket/o?"
|
||||
+ "uploadType=resumable"
|
||||
+ "&upload_id=" + UUIDs.randomBase64UUID()
|
||||
+ "&test_blob_name=" + blobName); // not a Google Storage parameter, but it allows to pass the blob name
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
|
||||
} else if (Regex.simpleMatch("PUT /upload/storage/v1/b/bucket/o?*uploadType=resumable*", request)) {
|
||||
final Map<String, String> params = new HashMap<>();
|
||||
RestUtils.decodeQueryString(exchange.getRequestURI().getQuery(), 0, params);
|
||||
|
||||
final String blobName = params.get("test_blob_name");
|
||||
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
|
||||
assert Strings.hasLength(range);
|
||||
|
||||
Matcher matcher = Pattern.compile("bytes ([^/]*)/([0-9\\*]*)").matcher(range);
|
||||
if (matcher.find()) {
|
||||
String bytes = matcher.group(1);
|
||||
String limit = matcher.group(2);
|
||||
byte[] blob = blobs.get(blobName).array();
|
||||
assert blob != null;
|
||||
// client is uploading a chunk
|
||||
matcher = Pattern.compile("([0-9]*)-([0-9]*)").matcher(bytes);
|
||||
assert matcher.find();
|
||||
|
||||
int end = Integer.parseInt(matcher.group(2));
|
||||
int start = Integer.parseInt(matcher.group(1));
|
||||
|
||||
final ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
long count = Streams.copy(exchange.getRequestBody(), out);
|
||||
int length = Math.max(end + 1, "*".equals(limit) ? 0 : Integer.parseInt(limit));
|
||||
assert count <= length;
|
||||
if (length > blob.length) {
|
||||
blob = ArrayUtil.growExact(blob, length);
|
||||
}
|
||||
assert blob.length >= end;
|
||||
System.arraycopy(out.toByteArray(), 0, blob, start, Math.toIntExact(count));
|
||||
blobs.put(blobName, new BytesArray(blob));
|
||||
|
||||
if ("*".equals(limit)) {
|
||||
exchange.getResponseHeaders().add("Range", String.format(Locale.ROOT, "bytes=%d/%d", start, end));
|
||||
exchange.getResponseHeaders().add("Content-Length", "0");
|
||||
exchange.sendResponseHeaders(308 /* Resume Incomplete */, -1);
|
||||
} else {
|
||||
assert blob.length == Integer.parseInt(limit);
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
|
||||
}
|
||||
|
@ -415,9 +543,11 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
|||
|
||||
@Override
|
||||
protected String requestUniqueId(HttpExchange exchange) {
|
||||
final String range = exchange.getRequestHeaders().getFirst("Content-Range");
|
||||
return exchange.getRemoteAddress().toString()
|
||||
+ " " + exchange.getRequestMethod()
|
||||
+ " " + exchange.getRequestURI();
|
||||
+ " " + exchange.getRequestURI()
|
||||
+ (range != null ? " " + range : "");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -73,13 +73,17 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
|||
}
|
||||
|
||||
protected final String createRepository(final String name) {
|
||||
return createRepository(name, repositorySettings());
|
||||
}
|
||||
|
||||
protected final String createRepository(final String name, final Settings settings) {
|
||||
final boolean verify = randomBoolean();
|
||||
|
||||
logger.debug("--> creating repository [name: {}, verify: {}]", name, verify);
|
||||
logger.debug("--> creating repository [name: {}, verify: {}, settings: {}]", name, verify, settings);
|
||||
assertAcked(client().admin().cluster().preparePutRepository(name)
|
||||
.setType(repositoryType())
|
||||
.setVerify(verify)
|
||||
.setSettings(repositorySettings()));
|
||||
.setSettings(settings));
|
||||
|
||||
internalCluster().getDataOrMasterNodeInstances(RepositoriesService.class).forEach(repositories -> {
|
||||
assertThat(repositories.repository(name), notNullValue());
|
||||
|
@ -307,7 +311,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
|||
assertThat(response.getSnapshotInfo().successfulShards(), equalTo(response.getSnapshotInfo().totalShards()));
|
||||
}
|
||||
|
||||
private static void assertSuccessfulRestore(RestoreSnapshotRequestBuilder requestBuilder) {
|
||||
protected static void assertSuccessfulRestore(RestoreSnapshotRequestBuilder requestBuilder) {
|
||||
RestoreSnapshotResponse response = requestBuilder.get();
|
||||
assertSuccessfulRestore(response);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue