Implement leader rate limiting for file restore (#37677)
This is related to #35975. This commit implements rate limiting on the leader side using the CombinedRateLimiter.
This commit is contained in:
parent
2ba9e361ab
commit
eb43ab6d60
|
@ -16,8 +16,10 @@ import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
|
import org.elasticsearch.common.metrics.CounterMetric;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.CombinedRateLimiter;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||||
|
@ -42,6 +44,7 @@ import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.LongConsumer;
|
||||||
|
|
||||||
public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener {
|
public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener {
|
||||||
|
|
||||||
|
@ -52,6 +55,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
|
||||||
private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
|
private final CopyOnWriteArrayList<Consumer<String>> closeSessionListeners = new CopyOnWriteArrayList<>();
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final CcrSettings ccrSettings;
|
private final CcrSettings ccrSettings;
|
||||||
|
private final CounterMetric throttleTime = new CounterMetric();
|
||||||
|
|
||||||
public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) {
|
public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) {
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
|
@ -136,7 +140,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
|
||||||
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
|
throw new IllegalArgumentException("session [" + sessionUUID + "] not found");
|
||||||
}
|
}
|
||||||
restore.idle = false;
|
restore.idle = false;
|
||||||
return new SessionReader(restore);
|
return new SessionReader(restore, ccrSettings, throttleTime::inc);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) {
|
private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) {
|
||||||
|
@ -182,6 +186,10 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getThrottleTime() {
|
||||||
|
return this.throttleTime.count();
|
||||||
|
}
|
||||||
|
|
||||||
private static class RestoreSession extends AbstractRefCounted {
|
private static class RestoreSession extends AbstractRefCounted {
|
||||||
|
|
||||||
private final String sessionUUID;
|
private final String sessionUUID;
|
||||||
|
@ -254,9 +262,13 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
|
||||||
public static class SessionReader implements Closeable {
|
public static class SessionReader implements Closeable {
|
||||||
|
|
||||||
private final RestoreSession restoreSession;
|
private final RestoreSession restoreSession;
|
||||||
|
private final CcrSettings ccrSettings;
|
||||||
|
private final LongConsumer throttleListener;
|
||||||
|
|
||||||
private SessionReader(RestoreSession restoreSession) {
|
private SessionReader(RestoreSession restoreSession, CcrSettings ccrSettings, LongConsumer throttleListener) {
|
||||||
this.restoreSession = restoreSession;
|
this.restoreSession = restoreSession;
|
||||||
|
this.ccrSettings = ccrSettings;
|
||||||
|
this.throttleListener = throttleListener;
|
||||||
restoreSession.incRef();
|
restoreSession.incRef();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -270,6 +282,9 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
|
||||||
* @throws IOException if the read fails
|
* @throws IOException if the read fails
|
||||||
*/
|
*/
|
||||||
public long readFileBytes(String fileName, BytesReference reference) throws IOException {
|
public long readFileBytes(String fileName, BytesReference reference) throws IOException {
|
||||||
|
CombinedRateLimiter rateLimiter = ccrSettings.getRateLimiter();
|
||||||
|
long throttleTime = rateLimiter.maybePause(reference.length());
|
||||||
|
throttleListener.accept(throttleTime);
|
||||||
return restoreSession.readFileBytes(fileName, reference);
|
return restoreSession.readFileBytes(fileName, reference);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -238,9 +238,15 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRateLimitingIsEmployed() throws Exception {
|
public void testRateLimitingIsEmployed() throws Exception {
|
||||||
|
boolean followerRateLimiting = randomBoolean();
|
||||||
|
|
||||||
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
|
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
|
||||||
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
|
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K"));
|
||||||
|
if (followerRateLimiting) {
|
||||||
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||||
|
} else {
|
||||||
|
assertAcked(leaderClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||||
|
}
|
||||||
|
|
||||||
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
|
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
|
||||||
String leaderIndex = "index1";
|
String leaderIndex = "index1";
|
||||||
|
@ -256,11 +262,15 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
||||||
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);
|
||||||
|
|
||||||
List<CcrRepository> repositories = new ArrayList<>();
|
List<CcrRepository> repositories = new ArrayList<>();
|
||||||
|
List<CcrRestoreSourceService> restoreSources = new ArrayList<>();
|
||||||
|
|
||||||
for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
|
for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) {
|
||||||
Repository repository = repositoriesService.repository(leaderClusterRepoName);
|
Repository repository = repositoriesService.repository(leaderClusterRepoName);
|
||||||
repositories.add((CcrRepository) repository);
|
repositories.add((CcrRepository) repository);
|
||||||
}
|
}
|
||||||
|
for (CcrRestoreSourceService restoreSource : getLeaderCluster().getDataOrMasterNodeInstances(CcrRestoreSourceService.class)) {
|
||||||
|
restoreSources.add(restoreSource);
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("--> indexing some data");
|
logger.info("--> indexing some data");
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
|
@ -282,12 +292,20 @@ public class CcrRepositoryIT extends CcrIntegTestCase {
|
||||||
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
|
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
|
||||||
future.actionGet();
|
future.actionGet();
|
||||||
|
|
||||||
|
if (followerRateLimiting) {
|
||||||
assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));
|
assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0));
|
||||||
|
} else {
|
||||||
|
assertTrue(restoreSources.stream().anyMatch(cr -> cr.getThrottleTime() > 0));
|
||||||
|
}
|
||||||
|
|
||||||
settingsRequest = new ClusterUpdateSettingsRequest();
|
settingsRequest = new ClusterUpdateSettingsRequest();
|
||||||
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
|
ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY);
|
||||||
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
|
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue));
|
||||||
|
if (followerRateLimiting) {
|
||||||
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||||
|
} else {
|
||||||
|
assertAcked(leaderClient().admin().cluster().updateSettings(settingsRequest).actionGet());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFollowerMappingIsUpdated() throws IOException {
|
public void testFollowerMappingIsUpdated() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue