Implement follower rate limiting for file restore (#37449)
This is related to #35975. This commit implements rate limiting on the follower side using a new class `CombinedRateLimiter`.
This commit is contained in:
parent
381d035cd6
commit
b6f06a48c0
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* A rate limiter designed for multiple concurrent users.
|
||||
*/
|
||||
public class CombinedRateLimiter {
|
||||
|
||||
// TODO: This rate limiter has some concurrency issues between the two maybePause operations
|
||||
|
||||
private final AtomicLong bytesSinceLastPause = new AtomicLong();
|
||||
private final RateLimiter.SimpleRateLimiter rateLimiter;
|
||||
private volatile boolean rateLimit;
|
||||
|
||||
public CombinedRateLimiter(ByteSizeValue maxBytesPerSec) {
|
||||
rateLimit = maxBytesPerSec.getBytes() > 0;
|
||||
rateLimiter = new RateLimiter.SimpleRateLimiter(maxBytesPerSec.getMbFrac());
|
||||
}
|
||||
|
||||
public long maybePause(int bytes) {
|
||||
if (rateLimit) {
|
||||
long bytesSincePause = bytesSinceLastPause.addAndGet(bytes);
|
||||
if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) {
|
||||
// Time to pause
|
||||
bytesSinceLastPause.addAndGet(-bytesSincePause);
|
||||
return Math.max(rateLimiter.pause(bytesSincePause), 0);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public void setMBPerSec(ByteSizeValue maxBytesPerSec) {
|
||||
rateLimit = maxBytesPerSec.getBytes() > 0;
|
||||
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
|
||||
}
|
||||
}
|
|
@ -117,6 +117,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
private final Settings settings;
|
||||
private final CcrLicenseChecker ccrLicenseChecker;
|
||||
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
|
||||
private final SetOnce<CcrSettings> ccrSettings = new SetOnce<>();
|
||||
private Client client;
|
||||
|
||||
/**
|
||||
|
@ -159,6 +160,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
|
||||
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService();
|
||||
this.restoreSourceService.set(restoreSourceService);
|
||||
CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings());
|
||||
this.ccrSettings.set(ccrSettings);
|
||||
return Arrays.asList(
|
||||
ccrLicenseChecker,
|
||||
restoreSourceService,
|
||||
|
@ -291,7 +294,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
|||
|
||||
@Override
|
||||
public Map<String, Repository.Factory> getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) {
|
||||
Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings);
|
||||
Repository.Factory repositoryFactory =
|
||||
(metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get());
|
||||
return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory);
|
||||
}
|
||||
|
||||
|
|
|
@ -5,9 +5,14 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.CombinedRateLimiter;
|
||||
import org.elasticsearch.xpack.core.XPackSettings;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
@ -18,11 +23,6 @@ import java.util.List;
|
|||
*/
|
||||
public final class CcrSettings {
|
||||
|
||||
// prevent construction
|
||||
private CcrSettings() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Index setting for a following index.
|
||||
*/
|
||||
|
@ -35,6 +35,14 @@ public final class CcrSettings {
|
|||
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
|
||||
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);
|
||||
|
||||
|
||||
/**
|
||||
* Max bytes a node can recover per second.
|
||||
*/
|
||||
public static final Setting<ByteSizeValue> RECOVERY_MAX_BYTES_PER_SECOND =
|
||||
Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
|
||||
Setting.Property.Dynamic, Setting.Property.NodeScope);
|
||||
|
||||
/**
|
||||
* The settings defined by CCR.
|
||||
*
|
||||
|
@ -44,7 +52,23 @@ public final class CcrSettings {
|
|||
return Arrays.asList(
|
||||
XPackSettings.CCR_ENABLED_SETTING,
|
||||
CCR_FOLLOWING_INDEX_SETTING,
|
||||
RECOVERY_MAX_BYTES_PER_SECOND,
|
||||
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
|
||||
}
|
||||
|
||||
private final CombinedRateLimiter ccrRateLimiter;
|
||||
|
||||
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
|
||||
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
|
||||
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
|
||||
}
|
||||
|
||||
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
|
||||
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
|
||||
}
|
||||
|
||||
public CombinedRateLimiter getRateLimiter() {
|
||||
return ccrRateLimiter;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,8 +24,10 @@ import org.elasticsearch.common.bytes.BytesReference;
|
|||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.CombinedRateLimiter;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
|
@ -49,6 +51,7 @@ import org.elasticsearch.snapshots.SnapshotShardFailure;
|
|||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
import org.elasticsearch.xpack.ccr.action.CcrRequests;
|
||||
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
|
||||
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
|
||||
|
@ -66,6 +69,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.LongConsumer;
|
||||
|
||||
/**
|
||||
* This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to
|
||||
|
@ -79,12 +83,17 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST);
|
||||
|
||||
private final RepositoryMetaData metadata;
|
||||
private final CcrSettings ccrSettings;
|
||||
private final String remoteClusterAlias;
|
||||
private final Client client;
|
||||
private final CcrLicenseChecker ccrLicenseChecker;
|
||||
|
||||
public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) {
|
||||
private final CounterMetric throttledTime = new CounterMetric();
|
||||
|
||||
public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings,
|
||||
CcrSettings ccrSettings) {
|
||||
this.metadata = metadata;
|
||||
this.ccrSettings = ccrSettings;
|
||||
assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX;
|
||||
this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1];
|
||||
this.ccrLicenseChecker = ccrLicenseChecker;
|
||||
|
@ -206,7 +215,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
|
||||
@Override
|
||||
public long getRestoreThrottleTimeInNanos() {
|
||||
return 0;
|
||||
return throttledTime.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -257,7 +266,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
// TODO: There should be some local timeout. And if the remote cluster returns an unknown session
|
||||
// response, we should be able to retry by creating a new session.
|
||||
String name = metadata.name();
|
||||
try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
|
||||
try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) {
|
||||
restoreSession.restoreFiles();
|
||||
} catch (Exception e) {
|
||||
throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e);
|
||||
|
@ -285,6 +294,15 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
}
|
||||
}
|
||||
|
||||
private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
|
||||
RecoveryState recoveryState) {
|
||||
String sessionUUID = UUIDs.randomBase64UUID();
|
||||
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
|
||||
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
|
||||
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
|
||||
response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc);
|
||||
}
|
||||
|
||||
private static class RestoreSession extends FileRestoreContext implements Closeable {
|
||||
|
||||
private static final int BUFFER_SIZE = 1 << 16;
|
||||
|
@ -293,23 +311,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
private final String sessionUUID;
|
||||
private final DiscoveryNode node;
|
||||
private final Store.MetadataSnapshot sourceMetaData;
|
||||
private final CombinedRateLimiter rateLimiter;
|
||||
private final LongConsumer throttleListener;
|
||||
|
||||
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
|
||||
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) {
|
||||
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter,
|
||||
LongConsumer throttleListener) {
|
||||
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
|
||||
this.remoteClient = remoteClient;
|
||||
this.sessionUUID = sessionUUID;
|
||||
this.node = node;
|
||||
this.sourceMetaData = sourceMetaData;
|
||||
}
|
||||
|
||||
static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard,
|
||||
RecoveryState recoveryState) {
|
||||
String sessionUUID = UUIDs.randomBase64UUID();
|
||||
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
|
||||
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet();
|
||||
return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState,
|
||||
response.getStoreFileMetaData());
|
||||
this.rateLimiter = rateLimiter;
|
||||
this.throttleListener = throttleListener;
|
||||
}
|
||||
|
||||
void restoreFiles() throws IOException {
|
||||
|
@ -324,7 +338,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
|
||||
@Override
|
||||
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
|
||||
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata());
|
||||
return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -341,14 +355,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
private final String sessionUUID;
|
||||
private final DiscoveryNode node;
|
||||
private final StoreFileMetaData fileToRecover;
|
||||
private final CombinedRateLimiter rateLimiter;
|
||||
private final LongConsumer throttleListener;
|
||||
|
||||
private long pos = 0;
|
||||
|
||||
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) {
|
||||
private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover,
|
||||
CombinedRateLimiter rateLimiter, LongConsumer throttleListener) {
|
||||
this.remoteClient = remoteClient;
|
||||
this.sessionUUID = sessionUUID;
|
||||
this.node = node;
|
||||
this.fileToRecover = fileToRecover;
|
||||
this.rateLimiter = rateLimiter;
|
||||
this.throttleListener = throttleListener;
|
||||
}
|
||||
|
||||
|
||||
|
@ -365,6 +384,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
}
|
||||
|
||||
int bytesRequested = (int) Math.min(remainingBytes, len);
|
||||
|
||||
long nanosPaused = rateLimiter.maybePause(bytesRequested);
|
||||
throttleListener.accept(nanosPaused);
|
||||
|
||||
String fileName = fileToRecover.name();
|
||||
GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested);
|
||||
GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response =
|
||||
|
@ -388,5 +411,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
|
||||
return bytesReceived;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
|
@ -39,6 +40,8 @@ import org.elasticsearch.xpack.ccr.repository.CcrRepository;
|
|||
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -234,6 +237,60 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
|||
thread.join();
|
||||
}
|
||||
|
||||
public void testRateLimitingIsEmployed() throws Exception {
|
||||
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
|
||||
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
|
||||
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||
|
||||
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
|
||||
String leaderIndex = "index1";
|
||||
String followerIndex = "index2";
|
||||
|
||||
final int numberOfPrimaryShards = randomIntBetween(1, 3);
|
||||
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
|
||||
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
|
||||
ensureLeaderGreen(leaderIndex);
|
||||
|
||||
final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
|
||||
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||
|
||||
List<CcrRepository> repositories = new ArrayList<>();
|
||||
|
||||
for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
|
||||
Repository repository = repositoriesService.repository(leaderClusterRepoName);
|
||||
repositories.add((CcrRepository) repository);
|
||||
}
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
for (int i = 0; i < 100; i++) {
|
||||
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
|
||||
leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get();
|
||||
}
|
||||
|
||||
leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get();
|
||||
|
||||
Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
|
||||
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
|
||||
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
|
||||
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
|
||||
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
|
||||
false, true, settingsBuilder.build(), new String[0],
|
||||
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");
|
||||
|
||||
PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
|
||||
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
|
||||
future.actionGet();
|
||||
|
||||
assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));
|
||||
|
||||
settingsRequest = new ClusterUpdateSettingsRequest();
|
||||
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
|
||||
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
|
||||
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||
}
|
||||
|
||||
public void testFollowerMappingIsUpdated() throws IOException {
|
||||
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
|
||||
String leaderIndex = "index1";
|
||||
|
|
Loading…
Reference in New Issue