From eb43ab6d6081ca4f239a0e4c2672658b1f9d9a26 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 22 Jan 2019 10:57:37 -0700 Subject: [PATCH] Implement leader rate limiting for file restore (#37677) This is related to #35975. This commit implements rate limiting on the leader side using the CombinedRateLimiter. --- .../repository/CcrRestoreSourceService.java | 19 +++++++++++++-- .../xpack/ccr/CcrRepositoryIT.java | 24 ++++++++++++++++--- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java index 1c7f9f95adb..a72b2f21d71 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceService.java @@ -16,8 +16,10 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CombinedRateLimiter; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.KeyedLock; @@ -42,6 +44,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Consumer; +import java.util.function.LongConsumer; public class CcrRestoreSourceService extends AbstractLifecycleComponent implements IndexEventListener { @@ -52,6 +55,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen private final CopyOnWriteArrayList> closeSessionListeners = new CopyOnWriteArrayList<>(); private final ThreadPool threadPool; private final CcrSettings ccrSettings; + private final CounterMetric throttleTime = new CounterMetric(); public CcrRestoreSourceService(ThreadPool threadPool, CcrSettings ccrSettings) { this.threadPool = threadPool; @@ -136,7 +140,7 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen throw new IllegalArgumentException("session [" + sessionUUID + "] not found"); } restore.idle = false; - return new SessionReader(restore); + return new SessionReader(restore, ccrSettings, throttleTime::inc); } private void internalCloseSession(String sessionUUID, boolean throwIfSessionMissing) { @@ -182,6 +186,10 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen } } + public long getThrottleTime() { + return this.throttleTime.count(); + } + private static class RestoreSession extends AbstractRefCounted { private final String sessionUUID; @@ -254,9 +262,13 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen public static class SessionReader implements Closeable { private final RestoreSession restoreSession; + private final CcrSettings ccrSettings; + private final LongConsumer throttleListener; - private SessionReader(RestoreSession restoreSession) { + private SessionReader(RestoreSession restoreSession, CcrSettings ccrSettings, LongConsumer throttleListener) { this.restoreSession = restoreSession; + this.ccrSettings = ccrSettings; + this.throttleListener = throttleListener; restoreSession.incRef(); } @@ -270,6 +282,9 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen * @throws IOException if the read fails */ public long readFileBytes(String fileName, BytesReference reference) throws IOException { + CombinedRateLimiter rateLimiter = ccrSettings.getRateLimiter(); + long throttleTime = rateLimiter.maybePause(reference.length()); + throttleListener.accept(throttleTime); return restoreSession.readFileBytes(fileName, reference); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 47cc1c528fa..0a3669734dc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -238,9 +238,15 @@ public class CcrRepositoryIT extends CcrIntegTestCase { } public void testRateLimitingIsEmployed() throws Exception { + boolean followerRateLimiting = randomBoolean(); + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K")); - assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + if (followerRateLimiting) { + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } else { + assertAcked(leaderClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; @@ -256,11 +262,15 @@ public class CcrRepositoryIT extends CcrIntegTestCase { final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); List repositories = new ArrayList<>(); + List restoreSources = new ArrayList<>(); for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) { Repository repository = repositoriesService.repository(leaderClusterRepoName); repositories.add((CcrRepository) repository); } + for (CcrRestoreSourceService restoreSource : getLeaderCluster().getDataOrMasterNodeInstances(CcrRestoreSourceService.class)) { + restoreSources.add(restoreSource); + } logger.info("--> indexing some data"); for (int i = 0; i < 100; i++) { @@ -282,12 +292,20 @@ public class CcrRepositoryIT extends CcrIntegTestCase { restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); future.actionGet(); - assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0)); + if (followerRateLimiting) { + assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0)); + } else { + assertTrue(restoreSources.stream().anyMatch(cr -> cr.getThrottleTime() > 0)); + } settingsRequest = new ClusterUpdateSettingsRequest(); ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY); settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue)); - assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + if (followerRateLimiting) { + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } else { + assertAcked(leaderClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } } public void testFollowerMappingIsUpdated() throws IOException {