From 883940ad9243b03d923cb9c01588ffc510efceee Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Dec 2018 12:47:24 +0100 Subject: [PATCH] [CCR] Change AutofollowCoordinator to use wait_for_metadata_version (#36264) Changed AutofollowCoordinator makes use of the wait_for_metadata_version feature in cluster state API and removed hard coded poll interval. Originates from #35895 Relates to #33007 --- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 +- .../xpack/ccr/CcrLicenseChecker.java | 11 +- .../ccr/action/AutoFollowCoordinator.java | 37 ++-- .../TransportPutAutoFollowPatternAction.java | 5 +- .../action/AutoFollowCoordinatorTests.java | 183 +++++++++++++++--- 5 files changed, 183 insertions(+), 55 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index c90c4872717..b25bd71c67f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -156,7 +156,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E return Arrays.asList( ccrLicenseChecker, - new AutoFollowCoordinator(client, threadPool, clusterService, ccrLicenseChecker) + new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker) ); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 77ac94da4aa..b586c8f96f5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -121,8 +121,9 @@ public final class CcrLicenseChecker { client.getRemoteClusterClient(clusterAlias), request, onFailure, - leaderClusterState -> { - IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); + remoteClusterStateResponse -> { + ClusterState remoteClusterState = remoteClusterStateResponse.getState(); + IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex); if (leaderIndexMetaData == null) { onFailure.accept(new IndexNotFoundException(leaderIndex)); return; @@ -159,7 +160,7 @@ public final class CcrLicenseChecker { final String clusterAlias, final ClusterStateRequest request, final Consumer onFailure, - final Consumer leaderClusterStateConsumer) { + final Consumer leaderClusterStateConsumer) { try { Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias)); checkRemoteClusterLicenseAndFetchClusterState( @@ -199,7 +200,7 @@ public final class CcrLicenseChecker { final Client remoteClient, final ClusterStateRequest request, final Consumer onFailure, - final Consumer leaderClusterStateConsumer, + final Consumer leaderClusterStateConsumer, final Function nonCompliantLicense, final Function unknownLicense) { // we have to check the license on the remote cluster @@ -211,7 +212,7 @@ public final class CcrLicenseChecker { public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { if (licenseCheck.isSuccess()) { final ActionListener clusterStateListener = - ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure); + ActionListener.wrap(leaderClusterStateConsumer::accept, onFailure); // following an index in remote cluster, so use remote client to fetch leader index metadata remoteClient.admin().cluster().state(request, clusterStateListener); } else { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 6db0420ed4f..7900351105c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -14,6 +14,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -26,13 +27,11 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.CopyOnWriteHashMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; @@ -66,7 +65,6 @@ public class AutoFollowCoordinator implements ClusterStateListener { private static final int MAX_AUTO_FOLLOW_ERRORS = 256; private final Client client; - private final ThreadPool threadPool; private final ClusterService clusterService; private final CcrLicenseChecker ccrLicenseChecker; @@ -80,11 +78,9 @@ public class AutoFollowCoordinator implements ClusterStateListener { public AutoFollowCoordinator( Client client, - ThreadPool threadPool, ClusterService clusterService, CcrLicenseChecker ccrLicenseChecker) { this.client = client; - this.threadPool = threadPool; this.clusterService = clusterService; this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker, "ccrLicenseChecker"); clusterService.addListener(this); @@ -150,22 +146,24 @@ public class AutoFollowCoordinator implements ClusterStateListener { Map newAutoFollowers = new HashMap<>(newRemoteClusters.size()); for (String remoteCluster : newRemoteClusters) { - AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) { + AutoFollower autoFollower = new AutoFollower(remoteCluster, this::updateStats, clusterService::state) { @Override void getRemoteClusterState(final String remoteCluster, - final BiConsumer handler) { + final long metadataVersion, + final BiConsumer handler) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); request.metaData(true); request.routingTable(true); + request.waitForMetaDataVersion(metadataVersion); // TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState( client, remoteCluster, request, e -> handler.accept(null, e), - remoteClusterState -> handler.accept(remoteClusterState, null)); + remoteClusterStateResponse -> handler.accept(remoteClusterStateResponse, null)); } @Override @@ -239,19 +237,17 @@ public class AutoFollowCoordinator implements ClusterStateListener { abstract static class AutoFollower { private final String remoteCluster; - private final ThreadPool threadPool; private final Consumer> statsUpdater; private final Supplier followerClusterStateSupplier; + private volatile long metadataVersion = 0; private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults; AutoFollower(final String remoteCluster, - final ThreadPool threadPool, final Consumer> statsUpdater, final Supplier followerClusterStateSupplier) { this.remoteCluster = remoteCluster; - this.threadPool = threadPool; this.statsUpdater = statsUpdater; this.followerClusterStateSupplier = followerClusterStateSupplier; } @@ -276,9 +272,15 @@ public class AutoFollowCoordinator implements ClusterStateListener { this.autoFollowPatternsCountDown = new CountDown(patterns.size()); this.autoFollowResults = new AtomicArray<>(patterns.size()); - getRemoteClusterState(remoteCluster, (remoteClusterState, remoteError) -> { - if (remoteClusterState != null) { + getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> { + if (remoteClusterStateResponse != null) { assert remoteError == null; + if (remoteClusterStateResponse.isWaitForTimedOut()) { + start(); + return; + } + ClusterState remoteClusterState = remoteClusterStateResponse.getState(); + metadataVersion = remoteClusterState.metaData().version(); autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns); } else { assert remoteError != null; @@ -402,8 +404,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { autoFollowResults.set(slot, result); if (autoFollowPatternsCountDown.countDown()) { statsUpdater.accept(autoFollowResults.asList()); - // TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion: - threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::start); + start(); } } @@ -525,13 +526,15 @@ public class AutoFollowCoordinator implements ClusterStateListener { } /** - * Fetch the cluster state from the leader with the specified cluster alias + * Fetch a remote cluster state from with the specified cluster alias * @param remoteCluster the name of the leader cluster + * @param metadataVersion the last seen metadata version * @param handler the callback to invoke */ abstract void getRemoteClusterState( String remoteCluster, - BiConsumer handler + long metadataVersion, + BiConsumer handler ); abstract void createAndFollow( diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index a8194fc1f0f..8c722942d19 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -80,7 +81,7 @@ public class TransportPutAutoFollowPatternAction extends .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - Consumer consumer = remoteClusterState -> { + Consumer consumer = remoteClusterState -> { String[] indices = request.getLeaderIndexPatterns().toArray(new String[0]); ccrLicenseChecker.hasPrivilegesToFollowIndices(remoteClient, indices, e -> { if (e == null) { @@ -94,7 +95,7 @@ public class TransportPutAutoFollowPatternAction extends @Override public ClusterState execute(ClusterState currentState) throws Exception { - return innerPut(request, filteredHeaders, currentState, remoteClusterState); + return innerPut(request, filteredHeaders, currentState, remoteClusterState.getState()); } }); } else { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index c77402d5859..534397a0a9a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr.action; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -22,7 +23,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -46,13 +47,12 @@ import java.util.function.Supplier; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -60,7 +60,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollower() { Client client = mock(Client.class); - ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -90,12 +89,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), nullValue()); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(currentState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState)) { @Override void getRemoteClusterState(String remoteCluster, - BiConsumer handler) { + long metadataVersion, + BiConsumer handler) { assertThat(remoteCluster, equalTo("remote")); - handler.accept(remoteState, null); + handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null); } @Override @@ -131,7 +131,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollowerClusterStateApiFailure() { Client client = mock(Client.class); - ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), @@ -155,10 +154,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) { @Override void getRemoteClusterState(String remoteCluster, - BiConsumer handler) { + long metadataVersion, + BiConsumer handler) { handler.accept(null, failure); } @@ -182,7 +182,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollowerUpdateClusterStateFailure() { Client client = mock(Client.class); - ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -210,11 +209,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) { @Override void getRemoteClusterState(String remoteCluster, - BiConsumer handler) { - handler.accept(remoteState, null); + long metadataVersion, + BiConsumer handler) { + handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null); } @Override @@ -239,7 +239,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollowerCreateAndFollowApiCallFailure() { Client client = mock(Client.class); - ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -267,11 +266,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(clusterState)) { @Override void getRemoteClusterState(String remoteCluster, - BiConsumer handler) { - handler.accept(remoteState, null); + long metadataVersion, + BiConsumer handler) { + handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null); } @Override @@ -530,7 +530,6 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testStats() { AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( - null, null, mock(ClusterService.class), new CcrLicenseChecker(() -> true, () -> false) @@ -586,6 +585,122 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error")); } + public void testWaitForMetadataVersion() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Collections.singletonMap("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); + + final LinkedList leaderStates = new LinkedList<>(); + ClusterState[] states = new ClusterState[16]; + for (int i = 0; i < states.length; i++) { + states[i] = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + String indexName = "logs-" + i; + leaderStates.add(i == 0 ? createRemoteClusterState(indexName) : createRemoteClusterState(leaderStates.get(i - 1), indexName)); + } + + List allResults = new ArrayList<>(); + Consumer> handler = allResults::addAll; + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states)) { + + long previousRequestedMetadataVersion = 0; + + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + assertThat(metadataVersion, greaterThan(previousRequestedMetadataVersion)); + handler.accept(new ClusterStateResponse(new ClusterName("name"), leaderStates.poll(), 1L, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + handler.accept(null); + } + }; + autoFollower.start(); + assertThat(allResults.size(), equalTo(states.length)); + for (int i = 0; i < states.length; i++) { + assertThat(allResults.get(i).autoFollowExecutionResults.containsKey(new Index("logs-" + i, "_na_")), is(true)); + } + } + + public void testWaitForTimeOut() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Collections.singletonMap("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); + + ClusterState[] states = new ClusterState[16]; + for (int i = 0; i < states.length; i++) { + states[i] = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + } + Consumer> handler = results -> { + fail("should not be invoked"); + }; + AtomicInteger counter = new AtomicInteger(); + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(states)) { + + long previousRequestedMetadataVersion = 0; + + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + counter.incrementAndGet(); + assertThat(remoteCluster, equalTo("remote")); + assertThat(metadataVersion, greaterThan(previousRequestedMetadataVersion)); + handler.accept(new ClusterStateResponse(new ClusterName("name"), null, 1L, true), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + fail("should not be invoked"); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + fail("should not be invoked"); + } + }; + autoFollower.start(); + assertThat(counter.get(), equalTo(states.length)); + } + private static ClusterState createRemoteClusterState(String indexName) { IndexMetaData indexMetaData = IndexMetaData.builder(indexName) .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) @@ -603,6 +718,25 @@ public class AutoFollowCoordinatorTests extends ESTestCase { return csBuilder.build(); } + private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) { + IndexMetaData indexMetaData = IndexMetaData.builder(indexName) + .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder(previous.metaData()) + .version(previous.metaData().version() + 1) + .put(indexMetaData, true)); + + ShardRouting shardRouting = + TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build(); + csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + + return csBuilder.build(); + } + private static Supplier localClusterStateSupplier(ClusterState... states) { final AutoFollowMetadata emptyAutoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); @@ -620,15 +754,4 @@ public class AutoFollowCoordinatorTests extends ESTestCase { }; } - private static ThreadPool mockThreadPool() { - ThreadPool threadPool = mock(ThreadPool.class); - doAnswer(invocation -> { - Object[] args = invocation.getArguments(); - Runnable task = (Runnable) args[2]; - task.run(); - return null; - }).when(threadPool).schedule(any(), anyString(), any()); - return threadPool; - } - }