From 8f86469d3fbf540dc1e13a6d572f8d19806d9231 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Wed, 9 Oct 2019 19:16:23 +0200 Subject: [PATCH] Do not auto-follow closed indices (#47721) (#47800) Backport of (#47721) for 7.x. Similarly to #47582, Auto-follow patterns creates following indices as long as the remote index matches the pattern and the remote primary shards are all started. But since 7.2 closed indices are also replicated, and it does not play well with CCR auto-follow patterns as they create following indices for closed leader indices too. This commit changes the getLeaderIndicesToFollow() so that closed indices are excluded from auto-follow patterns. --- .../ccr/action/AutoFollowCoordinator.java | 3 + .../action/AutoFollowCoordinatorTests.java | 120 +++++++++++++++++- 2 files changed, 119 insertions(+), 4 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 13dc84b8582..cca60579ae6 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -596,6 +596,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements List followedIndexUUIDs) { List leaderIndicesToFollow = new ArrayList<>(); for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) { + if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) { + continue; + } if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex()); if (indexRoutingTable != null && diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 75606699b11..e3edf0489d7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -5,8 +5,10 @@ */ package org.elasticsearch.xpack.ccr.action; +import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -18,11 +20,13 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -44,10 +48,12 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -57,6 +63,7 @@ import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollo import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -416,6 +423,26 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result.get(1).getName(), equalTo("index2")); } + public void testGetLeaderIndicesToFollowWithClosedIndices() { + final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), + null, null, null, null, null, null, null, null, null, null, null); + + // index is opened + ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(1)); + assertThat(result, hasItem(remoteState.metaData().index("test-index").getIndex())); + + // index is closed + remoteState = ClusterState.builder(remoteState) + .metaData(MetaData.builder(remoteState.metaData()) + .put(IndexMetaData.builder(remoteState.metaData().index("test-index")).state(IndexMetaData.State.CLOSE).build(), true) + .build()) + .build(); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(0)); + } + public void testRecordLeaderIndexAsFollowFunction() { AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), Collections.singletonMap("pattern1", Collections.emptyList()), Collections.emptyMap()); @@ -763,7 +790,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase { autoFollower.start(); assertThat(allResults.size(), equalTo(states.length)); for (int i = 0; i < states.length; i++) { - assertThat(allResults.get(i).autoFollowExecutionResults.containsKey(new Index("logs-" + i, "_na_")), is(true)); + final String indexName = "logs-" + i; + assertThat(allResults.get(i).autoFollowExecutionResults.keySet().stream() + .anyMatch(index -> index.getName().equals(indexName)), is(true)); } } @@ -1049,6 +1078,87 @@ public class AutoFollowCoordinatorTests extends ESTestCase { } } + public void testClosedIndicesAreNotAutoFollowed() { + final Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + final String pattern = "pattern1"; + final ClusterState localState = ClusterState.builder(new ClusterName("local")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.singletonMap(pattern, + new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, null, null, null, null, null, null, null, + null, null, null)), + Collections.singletonMap(pattern, Collections.emptyList()), + Collections.singletonMap(pattern, Collections.emptyMap())))) + .build(); + + ClusterState remoteState = null; + final int nbLeaderIndices = randomIntBetween(1, 15); + for (int i = 0; i < nbLeaderIndices; i++) { + String indexName = "docs-" + i; + if (remoteState == null) { + remoteState = createRemoteClusterState(indexName, true); + } else { + remoteState = createRemoteClusterState(remoteState, indexName); + } + if (randomBoolean()) { + // randomly close the index + remoteState = ClusterState.builder(remoteState.getClusterName()) + .routingTable(remoteState.routingTable()) + .metaData(MetaData.builder(remoteState.metaData()) + .put(IndexMetaData.builder(remoteState.metaData().index(indexName)).state(IndexMetaData.State.CLOSE).build(), true) + .build()) + .build(); + } + } + + final ClusterState finalRemoteState = remoteState; + final AtomicReference lastModifiedClusterState = new AtomicReference<>(localState); + final List results = new ArrayList<>(); + final Set followedIndices = ConcurrentCollections.newConcurrentSet(); + final AutoFollower autoFollower = + new AutoFollower("remote", results::addAll, localClusterStateSupplier(localState), () -> 1L, Runnable::run) { + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + handler.accept(new ClusterStateResponse(new ClusterName("remote"), finalRemoteState, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + followedIndices.add(followRequest.getLeaderIndex()); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + lastModifiedClusterState.updateAndGet(updateFunction::apply); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + autoFollower.start(); + + assertThat(results, notNullValue()); + assertThat(results.size(), equalTo(1)); + + for (ObjectObjectCursor index : remoteState.metaData().indices()) { + boolean expect = index.value.getState() == IndexMetaData.State.OPEN; + assertThat(results.get(0).autoFollowExecutionResults.containsKey(index.value.getIndex()), is(expect)); + assertThat(followedIndices.contains(index.key), is(expect)); + } + } + private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) { Settings.Builder indexSettings; if (enableSoftDeletes != null) { @@ -1075,11 +1185,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase { private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) { IndexMetaData indexMetaData = IndexMetaData.builder(indexName) - .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) .numberOfShards(1) .numberOfReplicas(0) .build(); - ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) + ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName()) .metaData(MetaData.builder(previous.metaData()) .version(previous.metaData().version() + 1) .put(indexMetaData, true)); @@ -1087,7 +1199,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { ShardRouting shardRouting = TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build(); - csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build(); return csBuilder.build(); }