This commit changes searchable snapshots so that it now respects the repository's max_restore_bytes_per_sec setting when it downloads blobs. Backport of #55952 for 7.x
This commit is contained in:
parent
7aa0daaabd
commit
b9636713b1
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.blobstore.support;
|
||||
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetadata;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.DeleteResult;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class FilterBlobContainer implements BlobContainer {
|
||||
|
||||
private final BlobContainer delegate;
|
||||
|
||||
public FilterBlobContainer(BlobContainer delegate) {
|
||||
this.delegate = Objects.requireNonNull(delegate);
|
||||
}
|
||||
|
||||
protected abstract BlobContainer wrapChild(BlobContainer child);
|
||||
|
||||
@Override
|
||||
public BlobPath path() {
|
||||
return delegate.path();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream readBlob(String blobName) throws IOException {
|
||||
return delegate.readBlob(blobName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream readBlob(String blobName, long position, long length) throws IOException {
|
||||
return delegate.readBlob(blobName, position, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long readBlobPreferredLength() {
|
||||
return delegate.readBlobPreferredLength();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
|
||||
delegate.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
|
||||
delegate.writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteResult delete() throws IOException {
|
||||
return delegate.delete();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
|
||||
delegate.deleteBlobsIgnoringIfNotExists(blobNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetadata> listBlobs() throws IOException {
|
||||
return delegate.listBlobs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, BlobContainer> children() throws IOException {
|
||||
return delegate.children().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> wrapChild(e.getValue())));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public Map<String, BlobMetadata> listBlobsByPrefix(String blobNamePrefix) throws IOException {
|
||||
return delegate.listBlobsByPrefix(blobNamePrefix);
|
||||
}
|
||||
}
|
|
@ -1895,12 +1895,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
indexOutput.writeBytes(hash.bytes, hash.offset, hash.length);
|
||||
recoveryState.getIndex().addRecoveredBytesToFile(fileInfo.physicalName(), hash.length);
|
||||
} else {
|
||||
try (InputStream stream = maybeRateLimit(new SlicedInputStream(fileInfo.numberOfParts()) {
|
||||
try (InputStream stream = maybeRateLimitRestores(new SlicedInputStream(fileInfo.numberOfParts()) {
|
||||
@Override
|
||||
protected InputStream openSlice(long slice) throws IOException {
|
||||
return container.readBlob(fileInfo.partName(slice));
|
||||
}
|
||||
}, restoreRateLimiter, restoreRateLimitingTimeInNanos)) {
|
||||
})) {
|
||||
final byte[] buffer = new byte[BUFFER_SIZE];
|
||||
int length;
|
||||
while ((length = stream.read(buffer)) > 0) {
|
||||
|
@ -1942,6 +1942,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
return rateLimiter == null ? stream : new RateLimitingInputStream(stream, rateLimiter, metric::inc);
|
||||
}
|
||||
|
||||
public InputStream maybeRateLimitRestores(InputStream stream) {
|
||||
return maybeRateLimit(stream, restoreRateLimiter, restoreRateLimitingTimeInNanos);
|
||||
}
|
||||
|
||||
public InputStream maybeRateLimitSnapshots(InputStream stream) {
|
||||
return maybeRateLimit(stream, snapshotRateLimiter, snapshotRateLimitingTimeInNanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
|
||||
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
|
||||
|
@ -2118,8 +2126,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final long partBytes = fileInfo.partBytes(i);
|
||||
|
||||
// Make reads abortable by mutating the snapshotStatus object
|
||||
final InputStream inputStream = new FilterInputStream(maybeRateLimit(
|
||||
new InputStreamIndexInput(indexInput, partBytes), snapshotRateLimiter, snapshotRateLimitingTimeInNanos)) {
|
||||
final InputStream inputStream = new FilterInputStream(maybeRateLimitSnapshots(
|
||||
new InputStreamIndexInput(indexInput, partBytes))) {
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
checkAborted();
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.support.GroupedActionListener;
|
|||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.support.FilterBlobContainer;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -48,6 +49,7 @@ import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
|
|||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
|
@ -381,7 +383,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
|||
|
||||
logger.trace("{} warming cache for [{}] part [{}/{}]", shardId, file.physicalName(), part + 1, numberOfParts);
|
||||
final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong();
|
||||
((CachedBlobContainerIndexInput) input).prefetchPart(part); // TODO does not include any rate limitation
|
||||
((CachedBlobContainerIndexInput) input).prefetchPart(part);
|
||||
|
||||
logger.trace(
|
||||
() -> new ParameterizedMessage(
|
||||
|
@ -445,7 +447,10 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
|||
);
|
||||
|
||||
final LazyInitializable<BlobContainer, RuntimeException> lazyBlobContainer = new LazyInitializable<>(
|
||||
() -> blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id())
|
||||
() -> new RateLimitingBlobContainer(
|
||||
blobStoreRepository,
|
||||
blobStoreRepository.shardContainer(indexId, shardPath.getShardId().id())
|
||||
)
|
||||
);
|
||||
final LazyInitializable<BlobStoreIndexShardSnapshot, RuntimeException> lazySnapshot = new LazyInitializable<>(
|
||||
() -> blobStoreRepository.loadShardSnapshot(lazyBlobContainer.getOrCompute(), snapshotId)
|
||||
|
@ -484,4 +489,33 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link FilterBlobContainer} that uses {@link BlobStoreRepository#maybeRateLimitRestores(InputStream)} to limit the rate at which
|
||||
* blobs are read from the repository.
|
||||
*/
|
||||
private static class RateLimitingBlobContainer extends FilterBlobContainer {
|
||||
|
||||
private final BlobStoreRepository blobStoreRepository;
|
||||
|
||||
RateLimitingBlobContainer(BlobStoreRepository blobStoreRepository, BlobContainer blobContainer) {
|
||||
super(blobContainer);
|
||||
this.blobStoreRepository = blobStoreRepository;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BlobContainer wrapChild(BlobContainer child) {
|
||||
return new RateLimitingBlobContainer(blobStoreRepository, child);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream readBlob(String blobName) throws IOException {
|
||||
return blobStoreRepository.maybeRateLimitRestores(super.readBlob(blobName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream readBlob(String blobName, long position, long length) throws IOException {
|
||||
return blobStoreRepository.maybeRateLimitRestores(super.readBlob(blobName, position, length));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,9 +23,13 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.fs.FsRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
|
||||
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
|
||||
|
@ -46,11 +50,14 @@ import java.util.Set;
|
|||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
@ -292,6 +299,82 @@ public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegT
|
|||
ensureGreen(restoredIndexName);
|
||||
}
|
||||
|
||||
public void testMaxRestoreBytesPerSecIsUsed() throws Exception {
|
||||
final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
final Settings.Builder repositorySettings = Settings.builder().put("location", randomRepoPath());
|
||||
final boolean useRateLimits = randomBoolean();
|
||||
if (useRateLimits) {
|
||||
repositorySettings.put("max_restore_bytes_per_sec", new ByteSizeValue(10, ByteSizeUnit.KB));
|
||||
} else {
|
||||
repositorySettings.put("max_restore_bytes_per_sec", ByteSizeValue.ZERO);
|
||||
}
|
||||
assertAcked(
|
||||
client().admin().cluster().preparePutRepository(repositoryName).setType(FsRepository.TYPE).setSettings(repositorySettings)
|
||||
);
|
||||
|
||||
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
assertAcked(
|
||||
prepareCreate(
|
||||
indexName,
|
||||
Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(1, 3))
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||
)
|
||||
);
|
||||
final int nbDocs = between(10, 50);
|
||||
indexRandom(
|
||||
true,
|
||||
false,
|
||||
IntStream.range(0, nbDocs)
|
||||
.mapToObj(i -> client().prepareIndex(indexName, "_doc").setSource("foo", randomBoolean() ? "bar" : "baz"))
|
||||
.collect(Collectors.toList())
|
||||
);
|
||||
refresh(indexName);
|
||||
|
||||
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin()
|
||||
.cluster()
|
||||
.prepareCreateSnapshot(repositoryName, snapshotName)
|
||||
.setWaitForCompletion(true)
|
||||
.get();
|
||||
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
|
||||
assertThat(snapshotInfo.successfulShards(), greaterThan(0));
|
||||
assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards()));
|
||||
|
||||
assertAcked(client().admin().indices().prepareDelete(indexName));
|
||||
|
||||
logger.info("--> restoring index [{}] using rate limits [{}]", restoredIndexName, useRateLimits);
|
||||
final MountSearchableSnapshotRequest mount = new MountSearchableSnapshotRequest(
|
||||
restoredIndexName,
|
||||
repositoryName,
|
||||
snapshotName,
|
||||
indexName,
|
||||
Settings.builder().put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), Boolean.FALSE.toString()).build(),
|
||||
Strings.EMPTY_ARRAY,
|
||||
true
|
||||
);
|
||||
|
||||
final RestoreSnapshotResponse restore = client().execute(MountSearchableSnapshotAction.INSTANCE, mount).get();
|
||||
assertThat(restore.getRestoreInfo().failedShards(), equalTo(0));
|
||||
ensureGreen(restoredIndexName);
|
||||
|
||||
assertHitCount(client().prepareSearch(restoredIndexName).setSize(0).get(), nbDocs);
|
||||
|
||||
final Index restoredIndex = resolveIndex(restoredIndexName);
|
||||
for (String node : internalCluster().getNodeNames()) {
|
||||
final IndicesService service = internalCluster().getInstance(IndicesService.class, node);
|
||||
if (service != null && service.hasIndex(restoredIndex)) {
|
||||
final RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
|
||||
assertThat(
|
||||
repositoriesService.repository(repositoryName).getRestoreThrottleTimeInNanos(),
|
||||
useRateLimits ? greaterThan(0L) : equalTo(0L)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void assertRecovered(String indexName, TotalHits originalAllHits, TotalHits originalBarHits) throws Exception {
|
||||
final Thread[] threads = new Thread[between(1, 5)];
|
||||
final AtomicArray<TotalHits> allHits = new AtomicArray<>(threads.length);
|
||||
|
|
Loading…
Reference in New Issue