diff --git a/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java b/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java new file mode 100644 index 00000000000..23324cbe00b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/util/CombinedRateLimiter.java @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.util; + +import org.apache.lucene.store.RateLimiter; +import org.elasticsearch.common.unit.ByteSizeValue; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * A rate limiter designed for multiple concurrent users. + */ +public class CombinedRateLimiter { + + // TODO: This rate limiter has some concurrency issues between the two maybePause operations + + private final AtomicLong bytesSinceLastPause = new AtomicLong(); + private final RateLimiter.SimpleRateLimiter rateLimiter; + private volatile boolean rateLimit; + + public CombinedRateLimiter(ByteSizeValue maxBytesPerSec) { + rateLimit = maxBytesPerSec.getBytes() > 0; + rateLimiter = new RateLimiter.SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + } + + public long maybePause(int bytes) { + if (rateLimit) { + long bytesSincePause = bytesSinceLastPause.addAndGet(bytes); + if (bytesSincePause > rateLimiter.getMinPauseCheckBytes()) { + // Time to pause + bytesSinceLastPause.addAndGet(-bytesSincePause); + return Math.max(rateLimiter.pause(bytesSincePause), 0); + } + } + return 0; + } + + public void setMBPerSec(ByteSizeValue maxBytesPerSec) { + rateLimit = maxBytesPerSec.getBytes() > 0; + rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac()); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 8bbacac3d80..4a7f9600ffa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -117,6 +117,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E private final Settings settings; private final CcrLicenseChecker ccrLicenseChecker; private final SetOnce restoreSourceService = new SetOnce<>(); + private final SetOnce ccrSettings = new SetOnce<>(); private Client client; /** @@ -159,6 +160,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(); this.restoreSourceService.set(restoreSourceService); + CcrSettings ccrSettings = new CcrSettings(settings, clusterService.getClusterSettings()); + this.ccrSettings.set(ccrSettings); return Arrays.asList( ccrLicenseChecker, restoreSourceService, @@ -291,7 +294,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E @Override public Map getInternalRepositories(Environment env, NamedXContentRegistry namedXContentRegistry) { - Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings); + Repository.Factory repositoryFactory = + (metadata) -> new CcrRepository(metadata, client, ccrLicenseChecker, settings, ccrSettings.get()); return Collections.singletonMap(CcrRepository.TYPE, repositoryFactory); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index d7495dec8c2..fe0eb7853e3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -5,9 +5,14 @@ */ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CombinedRateLimiter; import org.elasticsearch.xpack.core.XPackSettings; import java.util.Arrays; @@ -18,11 +23,6 @@ import java.util.List; */ public final class CcrSettings { - // prevent construction - private CcrSettings() { - - } - /** * Index setting for a following index. */ @@ -35,6 +35,14 @@ public final class CcrSettings { public static final Setting CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( "ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + + /** + * Max bytes a node can recover per second. + */ + public static final Setting RECOVERY_MAX_BYTES_PER_SECOND = + Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), + Setting.Property.Dynamic, Setting.Property.NodeScope); + /** * The settings defined by CCR. * @@ -44,7 +52,23 @@ public final class CcrSettings { return Arrays.asList( XPackSettings.CCR_ENABLED_SETTING, CCR_FOLLOWING_INDEX_SETTING, + RECOVERY_MAX_BYTES_PER_SECOND, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); } + private final CombinedRateLimiter ccrRateLimiter; + + public CcrSettings(Settings settings, ClusterSettings clusterSettings) { + this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings)); + clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec); + } + + private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { + ccrRateLimiter.setMBPerSec(maxBytesPerSec); + } + + public CombinedRateLimiter getRateLimiter() { + return ccrRateLimiter; + } + } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 5c3e0edda61..33a8c64c961 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -24,8 +24,10 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.CombinedRateLimiter; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineException; @@ -49,6 +51,7 @@ import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.CcrRequests; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest; @@ -66,6 +69,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.LongConsumer; /** * This repository relies on a remote cluster for Ccr restores. It is read-only so it can only be used to @@ -79,12 +83,17 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST); private final RepositoryMetaData metadata; + private final CcrSettings ccrSettings; private final String remoteClusterAlias; private final Client client; private final CcrLicenseChecker ccrLicenseChecker; - public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings) { + private final CounterMetric throttledTime = new CounterMetric(); + + public CcrRepository(RepositoryMetaData metadata, Client client, CcrLicenseChecker ccrLicenseChecker, Settings settings, + CcrSettings ccrSettings) { this.metadata = metadata; + this.ccrSettings = ccrSettings; assert metadata.name().startsWith(NAME_PREFIX) : "CcrRepository metadata.name() must start with: " + NAME_PREFIX; this.remoteClusterAlias = Strings.split(metadata.name(), NAME_PREFIX)[1]; this.ccrLicenseChecker = ccrLicenseChecker; @@ -206,7 +215,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override public long getRestoreThrottleTimeInNanos() { - return 0; + return throttledTime.count(); } @Override @@ -257,7 +266,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit // TODO: There should be some local timeout. And if the remote cluster returns an unknown session // response, we should be able to retry by creating a new session. String name = metadata.name(); - try (RestoreSession restoreSession = RestoreSession.openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) { + try (RestoreSession restoreSession = openSession(name, remoteClient, leaderShardId, indexShard, recoveryState)) { restoreSession.restoreFiles(); } catch (Exception e) { throw new IndexShardRestoreFailedException(indexShard.shardId(), "failed to restore snapshot [" + snapshotId + "]", e); @@ -285,6 +294,15 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit } } + private RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard, + RecoveryState recoveryState) { + String sessionUUID = UUIDs.randomBase64UUID(); + PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, + new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(); + return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, + response.getStoreFileMetaData(), ccrSettings.getRateLimiter(), throttledTime::inc); + } + private static class RestoreSession extends FileRestoreContext implements Closeable { private static final int BUFFER_SIZE = 1 << 16; @@ -293,23 +311,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private final String sessionUUID; private final DiscoveryNode node; private final Store.MetadataSnapshot sourceMetaData; + private final CombinedRateLimiter rateLimiter; + private final LongConsumer throttleListener; RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, - RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData) { + RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, CombinedRateLimiter rateLimiter, + LongConsumer throttleListener) { super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; this.sourceMetaData = sourceMetaData; - } - - static RestoreSession openSession(String repositoryName, Client remoteClient, ShardId leaderShardId, IndexShard indexShard, - RecoveryState recoveryState) { - String sessionUUID = UUIDs.randomBase64UUID(); - PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, - new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId)).actionGet(); - return new RestoreSession(repositoryName, remoteClient, sessionUUID, response.getNode(), indexShard, recoveryState, - response.getStoreFileMetaData()); + this.rateLimiter = rateLimiter; + this.throttleListener = throttleListener; } void restoreFiles() throws IOException { @@ -324,7 +338,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata()); + return new RestoreFileInputStream(remoteClient, sessionUUID, node, fileInfo.metadata(), rateLimiter, throttleListener); } @Override @@ -341,14 +355,19 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private final String sessionUUID; private final DiscoveryNode node; private final StoreFileMetaData fileToRecover; + private final CombinedRateLimiter rateLimiter; + private final LongConsumer throttleListener; private long pos = 0; - private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover) { + private RestoreFileInputStream(Client remoteClient, String sessionUUID, DiscoveryNode node, StoreFileMetaData fileToRecover, + CombinedRateLimiter rateLimiter, LongConsumer throttleListener) { this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; this.fileToRecover = fileToRecover; + this.rateLimiter = rateLimiter; + this.throttleListener = throttleListener; } @@ -365,6 +384,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit } int bytesRequested = (int) Math.min(remainingBytes, len); + + long nanosPaused = rateLimiter.maybePause(bytesRequested); + throttleListener.accept(nanosPaused); + String fileName = fileToRecover.name(); GetCcrRestoreFileChunkRequest request = new GetCcrRestoreFileChunkRequest(node, sessionUUID, fileName, bytesRequested); GetCcrRestoreFileChunkAction.GetCcrRestoreFileChunkResponse response = @@ -388,5 +411,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit return bytesReceived; } + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index 825520d2f15..a635487084b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; @@ -39,6 +40,8 @@ import org.elasticsearch.xpack.ccr.repository.CcrRepository; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -234,6 +237,60 @@ public class CcrRepositoryIT extends CcrIntegTestCase { thread.join(); } + public void testRateLimitingIsEmployed() throws Exception { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), "10K")); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; + String leaderIndex = "index1"; + String followerIndex = "index2"; + + final int numberOfPrimaryShards = randomIntBetween(1, 3); + final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1), + singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON)); + ensureLeaderGreen(leaderIndex); + + final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class); + final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class); + + List repositories = new ArrayList<>(); + + for (RepositoriesService repositoriesService : getFollowerCluster().getDataOrMasterNodeInstances(RepositoriesService.class)) { + Repository repository = repositoriesService.repository(leaderClusterRepoName); + repositories.add((CcrRepository) repository); + } + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + final String source = String.format(Locale.ROOT, "{\"f\":%d}", i); + leaderClient().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); + } + + leaderClient().admin().indices().prepareFlush(leaderIndex).setForce(true).setWaitIfOngoing(true).get(); + + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex) + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName, + CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions, + "^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false, + false, true, settingsBuilder.build(), new String[0], + "restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]"); + + PlainActionFuture future = PlainActionFuture.newFuture(); + restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future)); + future.actionGet(); + + assertTrue(repositories.stream().anyMatch(cr -> cr.getRestoreThrottleTimeInNanos() > 0)); + + settingsRequest = new ClusterUpdateSettingsRequest(); + ByteSizeValue defaultValue = CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getDefault(Settings.EMPTY); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND.getKey(), defaultValue)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + } + public void testFollowerMappingIsUpdated() throws IOException { String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1";