diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index 01117d2575b..d6005b6d830 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -121,7 +121,7 @@ public final class CcrLicenseChecker { client.getRemoteClusterClient(clusterAlias), request, onFailure, - remoteClusterStateResponse -> { + remoteClusterStateResponse -> { ClusterState remoteClusterState = remoteClusterStateResponse.getState(); IndexMetaData leaderIndexMetaData = remoteClusterState.getMetaData().index(leaderIndex); if (leaderIndexMetaData == null) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 89c1144cbd6..0a7900d004b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.license.LicenseUtils; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -365,8 +366,8 @@ public class AutoFollowCoordinator implements ClusterStateListener { Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); - final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, - clusterState, followedIndices); + final List leaderIndicesToFollow = + getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { finalise(slot, new AutoFollowResult(autoFollowPatternName)); } else { @@ -379,7 +380,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { Consumer resultHandler = result -> finalise(slot, result); checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, - patternsForTheSameRemoteCluster, remoteClusterState.metaData(), resultHandler); + patternsForTheSameRemoteCluster, remoteClusterState.metaData(), clusterState.metaData(), resultHandler); } i++; } @@ -393,6 +394,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { Map headers, List> patternsForTheSameRemoteCluster, MetaData remoteMetadata, + MetaData localMetadata, Consumer resultHandler) { final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); @@ -430,7 +432,16 @@ public class AutoFollowCoordinator implements ClusterStateListener { } }); continue; + } else if (leaderIndexAlreadyFollowed(autoFollowPattern, indexToFollow, localMetadata)) { + updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow), error -> { + results.set(slot, new Tuple<>(indexToFollow, error)); + if (leaderIndicesCountDown.countDown()) { + resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); + } + }); + continue; } + followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> { results.set(slot, new Tuple<>(indexToFollow, error)); if (leaderIndicesCountDown.countDown()) { @@ -441,6 +452,25 @@ public class AutoFollowCoordinator implements ClusterStateListener { } } + private static boolean leaderIndexAlreadyFollowed(AutoFollowPattern autoFollowPattern, + Index leaderIndex, + MetaData localMetadata) { + String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndex.getName()); + IndexMetaData indexMetaData = localMetadata.index(followIndexName); + if (indexMetaData != null) { + // If an index with the same name exists, but it is not a follow index for this leader index then + // we should let the auto follower attempt to auto follow it, so it can fail later and + // it is then visible in the auto follow stats. For example a cluster can just happen to have + // an index with the same name as the new follower index. + Map customData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + if (customData != null) { + String recordedLeaderIndexUUID = customData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + return leaderIndex.getUUID().equals(recordedLeaderIndexUUID); + } + } + return false; + } + private void followLeaderIndex(String autoFollowPattenName, String remoteCluster, Index indexToFollow, @@ -492,7 +522,6 @@ public class AutoFollowCoordinator implements ClusterStateListener { static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, ClusterState remoteClusterState, - ClusterState followerClusterState, List followedIndexUUIDs) { List leaderIndicesToFollow = new ArrayList<>(); for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) { @@ -505,10 +534,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { // this index will be auto followed. indexRoutingTable.allPrimaryShardsActive() && followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) { - // TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData - // has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable - // If so then handle it differently: not follow it, but just add an entry to - // AutoFollowMetadata#followedLeaderIndexUUIDs + leaderIndicesToFollow.add(leaderIndexMetaData.getIndex()); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 1eb4a7dcced..1c6864088b5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; @@ -345,8 +346,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .routingTable(routingTableBuilder.build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, - Collections.emptyList()); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(5)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -356,7 +356,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result.get(4).getName(), equalTo("metrics-4")); List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); - result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, followedIndexUUIDs); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(4)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -390,8 +390,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, - Collections.emptyList()); + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); assertThat(result.size(), equalTo(1)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -404,7 +403,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { .routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build()) .build(); - result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, Collections.emptyList()); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList()); assertThat(result.size(), equalTo(2)); result.sort(Comparator.comparing(Index::getName)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -864,6 +863,83 @@ public class AutoFollowCoordinatorTests extends ESTestCase { "because soft deletes are not enabled")); } + public void testAutoFollowerFollowerIndexAlreadyExists() { + Client client = mock(Client.class); + when(client.getRemoteClusterClient(anyString())).thenReturn(client); + + ClusterState remoteState = createRemoteClusterState("logs-20190101", true); + + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), + null, null, null, null, null, null, null, null, null, null, null); + Map patterns = new HashMap<>(); + patterns.put("remote", autoFollowPattern); + Map> followedLeaderIndexUUIDS = new HashMap<>(); + followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); + Map> autoFollowHeaders = new HashMap<>(); + autoFollowHeaders.put("remote", Collections.singletonMap("key", "val")); + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders); + + ClusterState currentState = ClusterState.builder(new ClusterName("name")) + .metaData(MetaData.builder() + .put(IndexMetaData.builder("logs-20190101") + .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, Collections.singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, + remoteState.metaData().index("logs-20190101").getIndexUUID())) + .numberOfShards(1) + .numberOfReplicas(0)) + .putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + + final Object[] resultHolder = new Object[1]; + Consumer> handler = results -> { + resultHolder[0] = results; + }; + AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) { + @Override + void getRemoteClusterState(String remoteCluster, + long metadataVersion, + BiConsumer handler) { + assertThat(remoteCluster, equalTo("remote")); + handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null); + } + + @Override + void createAndFollow(Map headers, + PutFollowAction.Request followRequest, + Runnable successHandler, + Consumer failureHandler) { + fail("this should not be invoked"); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, + Consumer handler) { + ClusterState resultCs = updateFunction.apply(currentState); + AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1)); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + autoFollower.start(); + + @SuppressWarnings("unchecked") + List results = (List) resultHolder[0]; + assertThat(results, notNullValue()); + assertThat(results.size(), equalTo(1)); + assertThat(results.get(0).clusterStateFetchException, nullValue()); + List> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet()); + assertThat(entries.size(), equalTo(1)); + assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); + assertThat(entries.get(0).getValue(), nullValue()); + } + private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) { Settings.Builder indexSettings; if (enableSoftDeletes != null) {