From 4a15e2b29e9fdca39bb5e224415f653eeae5fdf4 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 5 Feb 2019 13:34:00 -0600 Subject: [PATCH] Make Ccr recovery file chunk size configurable (#38370) This commit adds a byte setting `ccr.indices.recovery.chunk_size`. This setting configs the size of file chunk requested while recovering from remote. --- .../elasticsearch/xpack/ccr/CcrSettings.java | 20 +++++++++++++++++++ .../xpack/ccr/repository/CcrRepository.java | 4 +--- .../xpack/ccr/CcrRepositoryIT.java | 11 +++++++++- .../CcrRestoreSourceServiceTests.java | 3 ++- 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 625429dc0ab..0e147f66d6e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -49,6 +49,14 @@ public final class CcrSettings { Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), Setting.Property.Dynamic, Setting.Property.NodeScope); + /** + * File chunk size to send during recovery + */ + public static final Setting RECOVERY_CHUNK_SIZE = + Setting.byteSizeSetting("ccr.indices.recovery.chunk_size", new ByteSizeValue(1, ByteSizeUnit.MB), + new ByteSizeValue(1, ByteSizeUnit.KB), new ByteSizeValue(1, ByteSizeUnit.GB), Setting.Property.Dynamic, + Setting.Property.NodeScope); + /** * The leader must open resources for a ccr recovery. If there is no activity for this interval of time, * the leader will close the restore session. @@ -77,22 +85,30 @@ public final class CcrSettings { INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, + RECOVERY_CHUNK_SIZE, CCR_WAIT_FOR_METADATA_TIMEOUT); } private final CombinedRateLimiter ccrRateLimiter; private volatile TimeValue recoveryActivityTimeout; private volatile TimeValue recoveryActionTimeout; + private volatile ByteSizeValue chunkSize; public CcrSettings(Settings settings, ClusterSettings clusterSettings) { this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings); this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings)); + this.chunkSize = RECOVERY_MAX_BYTES_PER_SECOND.get(settings); clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(RECOVERY_CHUNK_SIZE, this::setChunkSize); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout); } + private void setChunkSize(ByteSizeValue chunkSize) { + this.chunkSize = chunkSize; + } + private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { ccrRateLimiter.setMBPerSec(maxBytesPerSec); } @@ -105,6 +121,10 @@ public final class CcrSettings { this.recoveryActionTimeout = recoveryActionTimeout; } + public ByteSizeValue getChunkSize() { + return chunkSize; + } + public CombinedRateLimiter getRateLimiter() { return ccrRateLimiter; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index baad95d5a94..7ceaeb903ec 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -329,8 +329,6 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit private static class RestoreSession extends FileRestoreContext implements Closeable { - private static final int BUFFER_SIZE = 1 << 16; - private final Client remoteClient; private final String sessionUUID; private final DiscoveryNode node; @@ -342,7 +340,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion, CcrSettings ccrSettings, LongConsumer throttleListener) { - super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE); + super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes())); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index d4d6d13f7a2..9f061b9c330 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -169,8 +169,12 @@ public class CcrRepositoryIT extends CcrIntegTestCase { assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID()); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38100") public void testDocsAreRecovered() throws Exception { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + String chunkSize = randomFrom("4KB", "128KB", "1MB"); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; String followerIndex = "index2"; @@ -243,6 +247,11 @@ public class CcrRepositoryIT extends CcrIntegTestCase { isRunning.set(false); thread.join(); + + settingsRequest = new ClusterUpdateSettingsRequest(); + ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); } public void testRateLimitingIsEmployed() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 3035b96b5bc..1c3c0da3d3c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -40,7 +40,8 @@ public class CcrRestoreSourceServiceTests extends IndexShardTestCase { Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); taskQueue = new DeterministicTaskQueue(settings, random()); Set> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, - CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING); + CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, + CcrSettings.RECOVERY_CHUNK_SIZE); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings); restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings)); }