Move testSnapshotWithLargeSegmentFiles to ESMockAPIBasedRepositoryIntegTestCase (#46802)

This commit moves the common test testSnapshotWithLargeSegmentFiles 
to the ESMockAPIBasedRepositoryIntegTestCase base class.
This commit is contained in:
Tanguy Leroux 2019-09-18 15:30:51 +02:00
parent cebe8da617
commit 3ae51f25dd
6 changed files with 51 additions and 136 deletions

View File

@ -24,8 +24,6 @@ import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
@ -34,13 +32,10 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.BackgroundIndexer;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -58,10 +53,6 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint") @SuppressForbidden(reason = "this test uses a HttpServer to emulate an Azure endpoint")
public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
@ -73,6 +64,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
@Override @Override
protected Settings repositorySettings() { protected Settings repositorySettings() {
return Settings.builder() return Settings.builder()
.put(super.repositorySettings())
.put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container") .put(AzureRepository.Repository.CONTAINER_SETTING.getKey(), "container")
.put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test") .put(AzureStorageSettings.ACCOUNT_SETTING.getKey(), "test")
.build(); .build();
@ -108,41 +100,6 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
.build(); .build();
} }
/**
* Test the snapshot and restore of an index which has large segments files.
*/
public void testSnapshotWithLargeSegmentFiles() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.MINUS_ONE)
.build());
// the number of documents here dictates the size of the single segment
// we want a large segment (1Mb+) so that Azure SDK client executes Put Block API calls
// the size of each uploaded block is defined by Constants.DEFAULT_STREAM_WRITE_IN_BYTES (~4Mb)
final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
awaitBusy(() -> indexer.totalIndexedDocs() >= nbDocs);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}
/** /**
* AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy * AzureRepositoryPlugin that allows to set low values for the Azure's client retry policy
* and for BlobRequestOptions#getSingleBlobPutThresholdInBytes(). * and for BlobRequestOptions#getSingleBlobPutThresholdInBytes().

View File

@ -20,15 +20,12 @@
package org.elasticsearch.repositories.gcs; package org.elasticsearch.repositories.gcs;
import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.BaseWriteChannel;
import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.StorageOptions; import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpHandler;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
@ -49,7 +46,6 @@ import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.threeten.bp.Duration; import org.threeten.bp.Duration;
@ -82,9 +78,6 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSetting
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.TOKEN_URI_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.BUCKET;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageRepository.CLIENT_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
@SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint") @SuppressForbidden(reason = "this test uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase { public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTestCase {
@ -178,44 +171,6 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
assertEquals("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]", e.getMessage()); assertEquals("failed to parse value [101mb] for setting [chunk_size], must be <= [100mb]", e.getMessage());
} }
/**
* Test the snapshot and restore of an index which has large segments files (2Mb+).
*
* The value of 2Mb is chosen according to the default chunk size configured in Google SDK client
* (see {@link BaseWriteChannel} chunk size).
*/
public void testSnapshotWithLargeSegmentFiles() throws Exception {
final String repository = createRepository("repository", Settings.builder()
.put(BUCKET.getKey(), "bucket")
.put(CLIENT_NAME.getKey(), "test")
.build());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
final int nbDocs = 10_000;
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), nbDocs)) {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}
public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin { public static class TestGoogleCloudStoragePlugin extends GoogleCloudStoragePlugin {
public TestGoogleCloudStoragePlugin(Settings settings) { public TestGoogleCloudStoragePlugin(Settings settings) {

View File

@ -23,8 +23,6 @@ import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
import com.amazonaws.util.Base16; import com.amazonaws.util.Base16;
import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpHandler;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
@ -45,7 +43,6 @@ import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTes
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils; import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper; import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -63,9 +60,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
@ -79,6 +73,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
@Override @Override
protected Settings repositorySettings() { protected Settings repositorySettings() {
return Settings.builder() return Settings.builder()
.put(super.repositorySettings())
.put(S3Repository.BUCKET_SETTING.getKey(), "bucket") .put(S3Repository.BUCKET_SETTING.getKey(), "bucket")
.put(S3Repository.CLIENT_NAME.getKey(), "test") .put(S3Repository.CLIENT_NAME.getKey(), "test")
.build(); .build();
@ -116,34 +111,6 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
.build(); .build();
} }
public void testSnapshotWithLargeSegmentFiles() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}
/** /**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload. * S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/ */

View File

@ -21,6 +21,7 @@ package org.elasticsearch.repositories.fs;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
@ -44,10 +45,14 @@ public class FsBlobStoreRepositoryIT extends ESBlobStoreRepositoryIntegTestCase
@Override @Override
protected Settings repositorySettings() { protected Settings repositorySettings() {
return Settings.builder() final Settings.Builder settings = Settings.builder();
.put(super.repositorySettings()) settings.put(super.repositorySettings());
.put("location", randomRepoPath()) settings.put("location", randomRepoPath());
.build(); if (randomBoolean()) {
long size = 1 << randomInt(10);
settings.put("chunk_size", new ByteSizeValue(size, ByteSizeUnit.KB));
}
return settings.build();
} }
public void testMissingDirectoriesNotCreatedInReadonlyRepository() throws IOException, ExecutionException, InterruptedException { public void testMissingDirectoriesNotCreatedInReadonlyRepository() throws IOException, ExecutionException, InterruptedException {

View File

@ -28,8 +28,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryData;
@ -63,13 +61,7 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
protected abstract String repositoryType(); protected abstract String repositoryType();
protected Settings repositorySettings() { protected Settings repositorySettings() {
final Settings.Builder settings = Settings.builder(); return Settings.builder().put("compress", randomBoolean()).build();
settings.put("compress", randomBoolean());
if (randomBoolean()) {
long size = 1 << randomInt(10);
settings.put("chunk_size", new ByteSizeValue(size, ByteSizeUnit.KB));
}
return settings.build();
} }
protected final String createRepository(final String name) { protected final String createRepository(final String name) {

View File

@ -22,11 +22,15 @@ import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer; import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpStatus; import org.apache.http.HttpStatus;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.InetAddresses; import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.BackgroundIndexer;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
@ -39,6 +43,10 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
/** /**
* Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services. * Integration tests for {@link BlobStoreRepository} implementations rely on mock APIs that emulate cloud-based services.
*/ */
@ -83,6 +91,37 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
protected abstract HttpHandler createErroneousHttpHandler(HttpHandler delegate); protected abstract HttpHandler createErroneousHttpHandler(HttpHandler delegate);
/**
* Test the snapshot and restore of an index which has large segments files.
*/
public final void testSnapshotWithLargeSegmentFiles() throws Exception {
final String repository = createRepository(randomName());
final String index = "index-no-merges";
createIndex(index, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build());
final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
assertSuccessfulSnapshot(client().admin().cluster().prepareCreateSnapshot(repository, "snapshot")
.setWaitForCompletion(true).setIndices(index));
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(client().admin().cluster().prepareRestoreSnapshot(repository, "snapshot").setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
}
protected static String httpServerUrl() { protected static String httpServerUrl() {
InetSocketAddress address = httpServer.getAddress(); InetSocketAddress address = httpServer.getAddress();
return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort(); return "http://" + InetAddresses.toUriString(address.getAddress()) + ":" + address.getPort();