diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index ab0f9958037..8ab9e396b4d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -179,8 +179,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E ThreadPool threadPool, Client client, SettingsModule settingsModule) { - IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings(); - return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings)); + return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, settingsModule)); } public List> getActions() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 26089ab4695..d6262a7711d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -30,11 +30,17 @@ public final class CcrSettings { Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex); /** - * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using. + * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator and shard follow task should be using. */ - public static final Setting CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( - "ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + public static final Setting CCR_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( + "ccr.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic); + /** + * Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using. + * TODO: Deprecate and remove this setting + */ + private static final Setting CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting( + "ccr.auto_follow.wait_for_metadata_timeout", CCR_WAIT_FOR_METADATA_TIMEOUT, Property.NodeScope, Property.Dynamic); /** * Max bytes a node can recover per second. @@ -62,7 +68,8 @@ public final class CcrSettings { CCR_FOLLOWING_INDEX_SETTING, RECOVERY_MAX_BYTES_PER_SECOND, INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, - CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT); + CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, + CCR_WAIT_FOR_METADATA_TIMEOUT); } private final CombinedRateLimiter ccrRateLimiter; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index e3b008efc56..19d2b9ce479 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -113,8 +113,8 @@ public class AutoFollowCoordinator implements ClusterStateListener { waitForMetadataTimeOut = newWaitForTimeOut; } }; - clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, updater); - waitForMetadataTimeOut = CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.get(settings); + clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT, updater); + waitForMetadataTimeOut = CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.get(settings); } public synchronized AutoFollowStats getStats() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 9274708f226..b56aa148efc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -335,10 +335,6 @@ public class ShardChangesAction extends Action { final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); final IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); - final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); - final long mappingVersion = indexMetaData.getMappingVersion(); - final long settingsVersion = indexMetaData.getSettingsVersion(); - final Translog.Operation[] operations = getOperations( indexShard, seqNoStats.getGlobalCheckpoint(), @@ -346,8 +342,14 @@ public class ShardChangesAction extends Action { request.getMaxOperationCount(), request.getExpectedHistoryUUID(), request.getMaxBatchSize()); - // must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations. + // must capture after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations. final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); + // must capture IndexMetaData after snapshotting operations to ensure the returned mapping version is at least as up-to-date + // as the mapping version that these operations used. Here we must not use IndexMetaData from ClusterService for we expose + // a new cluster state to ClusterApplier(s) before exposing it in the ClusterService. + final IndexMetaData indexMetaData = indexService.getMetaData(); + final long mappingVersion = indexMetaData.getMappingVersion(); + final long settingsVersion = indexMetaData.getSettingsVersion(); return getResponse( mappingVersion, settingsVersion, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index a1a0e25e126..d40fdd551be 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -130,7 +130,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical - updateMapping(followerMappingVersion -> { + updateMapping(0L, followerMappingVersion -> { synchronized (ShardFollowNodeTask.this) { currentMappingVersion = followerMappingVersion; } @@ -370,7 +370,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { coordinateReads(); } - private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) { + private synchronized void maybeUpdateMapping(long minimumRequiredMappingVersion, Runnable task) { if (currentMappingVersion >= minimumRequiredMappingVersion) { LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]", params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); @@ -378,7 +378,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } else { LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]", params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); - updateMapping(mappingVersion -> { + updateMapping(minimumRequiredMappingVersion, mappingVersion -> { currentMappingVersion = mappingVersion; task.run(); }); @@ -400,12 +400,13 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } } - private void updateMapping(LongConsumer handler) { - updateMapping(handler, new AtomicInteger(0)); + private void updateMapping(long minRequiredMappingVersion, LongConsumer handler) { + updateMapping(minRequiredMappingVersion, handler, new AtomicInteger(0)); } - private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) { - innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter))); + private void updateMapping(long minRequiredMappingVersion, LongConsumer handler, AtomicInteger retryCounter) { + innerUpdateMapping(minRequiredMappingVersion, handler, + e -> handleFailure(e, retryCounter, () -> updateMapping(minRequiredMappingVersion, handler, retryCounter))); } private void updateSettings(final LongConsumer handler) { @@ -471,7 +472,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } // These methods are protected for testing purposes: - protected abstract void innerUpdateMapping(LongConsumer handler, Consumer errorHandler); + protected abstract void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler); protected abstract void innerUpdateSettings(LongConsumer handler, Consumer errorHandler); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 97308126ffb..956171ba9b7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -24,11 +24,13 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; @@ -46,6 +48,7 @@ import org.elasticsearch.persistent.PersistentTasksExecutor; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; @@ -69,16 +72,20 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor this.waitForMetadataTimeOut = newVal); } @Override @@ -112,33 +119,25 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor errorHandler) { - Index leaderIndex = params.getLeaderShardId().getIndex(); - Index followIndex = params.getFollowShardId().getIndex(); - - ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); - CheckedConsumer onResponse = clusterStateResponse -> { - IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); - if (indexMetaData.getMappings().isEmpty()) { - assert indexMetaData.getMappingVersion() == 1; - handler.accept(indexMetaData.getMappingVersion()); - return; - } - - assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" + - indexMetaData.getMappings().size() + "]"; - MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; - - PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData); - followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( - putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), - errorHandler)); - }; - try { - remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); - } catch (Exception e) { - errorHandler.accept(e); - } + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { + final Index followerIndex = params.getFollowShardId().getIndex(); + getIndexMetadata(minRequiredMappingVersion, 0L, params, ActionListener.wrap( + indexMetaData -> { + if (indexMetaData.getMappings().isEmpty()) { + assert indexMetaData.getMappingVersion() == 1; + handler.accept(indexMetaData.getMappingVersion()); + return; + } + assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" + + indexMetaData.getMappings().size() + "]"; + MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value; + PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData); + followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( + putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), + errorHandler)); + }, + errorHandler + )); } @Override @@ -257,6 +256,39 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor listener) { + final Index leaderIndex = params.getLeaderShardId().getIndex(); + final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); + if (minRequiredMetadataVersion > 0) { + clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion).waitForTimeout(waitForMetadataTimeOut); + } + try { + remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap( + r -> { + // if wait_for_metadata_version timeout, the response is empty + if (r.getState() == null) { + assert minRequiredMetadataVersion > 0; + getIndexMetadata(minRequiredMappingVersion, minRequiredMetadataVersion, params, listener); + return; + } + final MetaData metaData = r.getState().metaData(); + final IndexMetaData indexMetaData = metaData.getIndexSafe(leaderIndex); + if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) { + // ask for the next version. + getIndexMetadata(minRequiredMappingVersion, metaData.version() + 1, params, listener); + } else { + assert metaData.version() >= minRequiredMetadataVersion : metaData.version() + " < " + minRequiredMetadataVersion; + listener.onResponse(indexMetaData); + } + }, + listener::onFailure + )); + } catch (Exception e) { + listener.onFailure(e); + } + } + interface FollowerStatsInfoHandler { void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 7af3d690e3a..65fd80325e7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -201,7 +201,7 @@ public abstract class CcrIntegTestCase extends ESTestCase { builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: - builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) { builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java index e77a672f1fd..ad8f545fa9d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrSingleNodeTestCase.java @@ -46,7 +46,7 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase { builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); // Let cluster state api return quickly in order to speed up auto follow tests: - builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); + builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); return builder.build(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index d58a2d0a0f1..f03eeaaa036 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -6,25 +6,34 @@ package org.elasticsearch.xpack.ccr; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.xpack.CcrIntegTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; +import org.elasticsearch.xpack.core.ccr.client.CcrClient; +import org.hamcrest.Matchers; import java.util.Locale; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singletonMap; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; public class FollowerFailOverIT extends CcrIntegTestCase { @@ -220,4 +230,56 @@ public class FollowerFailOverIT extends CcrIntegTestCase { pauseFollow("follower-index"); } + public void testReadRequestsReturnsLatestMappingVersion() throws Exception { + InternalTestCluster leaderCluster = getLeaderCluster(); + Settings nodeAttributes = Settings.builder().put("node.attr.box", "large").build(); + String dataNode = leaderCluster.startDataOnlyNode(nodeAttributes); + assertAcked( + leaderClient().admin().indices().prepareCreate("leader-index") + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") + .put("index.routing.allocation.require.box", "large")) + .get() + ); + ClusterService clusterService = leaderCluster.clusterService(dataNode); + ShardId shardId = clusterService.state().routingTable().index("leader-index").shard(0).shardId(); + IndicesService indicesService = leaderCluster.getInstance(IndicesService.class, dataNode); + IndexShard indexShard = indicesService.getShardOrNull(shardId); + // Block the ClusterService from exposing the cluster state with the mapping change. This makes the ClusterService + // have an older mapping version than the actual mapping version that IndexService will use to index "doc1". + final CountDownLatch latch = new CountDownLatch(1); + clusterService.addLowPriorityApplier(event -> { + IndexMetaData imd = event.state().metaData().index("leader-index"); + if (imd != null && imd.mapping() != null && + XContentMapValues.extractValue("properties.balance.type", imd.mapping().sourceAsMap()) != null) { + try { + logger.info("--> block ClusterService from exposing new mapping version"); + latch.await(); + } catch (Exception e) { + throw new AssertionError(e); + } + } + }); + leaderCluster.client().admin().indices().preparePutMapping().setType("doc") + .setSource("balance", "type=long").setTimeout(TimeValue.ZERO).get(); + IndexResponse indexResp = leaderCluster.client(dataNode).prepareIndex("leader-index", "doc", "1") + .setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get(); + assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L)); + getFollowerCluster().startDataOnlyNode(nodeAttributes); + followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get(); + ensureFollowerGreen("follower-index"); + // Make sure at least one read-request which requires mapping sync is completed. + assertBusy(() -> { + CcrClient ccrClient = new CcrClient(followerClient()); + FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet(); + long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum(); + assertThat(bytesRead, Matchers.greaterThan(0L)); + }, 60, TimeUnit.SECONDS); + latch.countDown(); + assertIndexFullyReplicatedToFollower("leader-index", "follower-index"); + pauseFollow("follower-index"); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 2ac67a65fc1..4d4603d022f 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -1016,7 +1016,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { private ClusterService mockClusterService() { ClusterService clusterService = mock(ClusterService.class); ClusterSettings clusterSettings = - new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT)); + new ClusterSettings(Settings.EMPTY, Collections.singleton(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT)); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); return clusterService; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index dde869f80be..4af9e7c23a2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -111,7 +111,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { private final Map fromToSlot = new HashMap<>(); @Override - protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { handler.accept(mappingVersion); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 9929241fc23..a7d07b60667 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -988,7 +988,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) { @Override - protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { Exception failure = mappingUpdateFailures.poll(); if (failure != null) { errorHandler.accept(failure); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 9dc7c6648ee..8a3e374a24c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -396,7 +396,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } @Override - protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { + protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer errorHandler) { // noop, as mapping updates are not tested handler.accept(1L); }