diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 688a7561092..6db0420ed4f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.ccr.action; +import com.carrotsearch.hppc.predicates.ObjectPredicate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -42,6 +43,7 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -151,7 +153,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) { @Override - void getLeaderClusterState(final String remoteCluster, + void getRemoteClusterState(final String remoteCluster, final BiConsumer handler) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); @@ -163,7 +165,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { remoteCluster, request, e -> handler.accept(null, e), - leaderClusterState -> handler.accept(leaderClusterState, null)); + remoteClusterState -> handler.accept(remoteClusterState, null)); } @Override @@ -203,7 +205,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { }; newAutoFollowers.put(remoteCluster, autoFollower); - autoFollower.autoFollowIndices(); + autoFollower.start(); } List removedRemoteClusters = new ArrayList<>(); @@ -254,9 +256,9 @@ public class AutoFollowCoordinator implements ClusterStateListener { this.followerClusterStateSupplier = followerClusterStateSupplier; } - void autoFollowIndices() { - final ClusterState followerClusterState = followerClusterStateSupplier.get(); - final AutoFollowMetadata autoFollowMetadata = followerClusterState.metaData().custom(AutoFollowMetadata.TYPE); + void start() { + final ClusterState clusterState = followerClusterStateSupplier.get(); + final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster); return; @@ -274,51 +276,58 @@ public class AutoFollowCoordinator implements ClusterStateListener { this.autoFollowPatternsCountDown = new CountDown(patterns.size()); this.autoFollowResults = new AtomicArray<>(patterns.size()); - getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { - if (leaderClusterState != null) { - assert e == null; - - int i = 0; - for (String autoFollowPatternName : patterns) { - final int slot = i; - AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName); - Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); - List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); - - final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState, - followerClusterState, followedIndices); - if (leaderIndicesToFollow.isEmpty()) { - finalise(slot, new AutoFollowResult(autoFollowPatternName)); - } else { - List> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns() - .entrySet().stream() - .filter(item -> autoFollowPatternName.equals(item.getKey()) == false) - .filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster())) - .map(item -> new Tuple<>(item.getKey(), item.getValue())) - .collect(Collectors.toList()); - - Consumer resultHandler = result -> finalise(slot, result); - checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, - patternsForTheSameLeaderCluster, resultHandler); - } - i++; - } + getRemoteClusterState(remoteCluster, (remoteClusterState, remoteError) -> { + if (remoteClusterState != null) { + assert remoteError == null; + autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns); } else { - List results = new ArrayList<>(patterns.size()); - for (String autoFollowPatternName : patterns) { - results.add(new AutoFollowResult(autoFollowPatternName, e)); + assert remoteError != null; + for (int i = 0; i < patterns.size(); i++) { + String autoFollowPatternName = patterns.get(i); + finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError)); } - statsUpdater.accept(results); } }); } + private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata, + final ClusterState clusterState, + final ClusterState remoteClusterState, + final List patterns) { + int i = 0; + for (String autoFollowPatternName : patterns) { + final int slot = i; + AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName); + Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); + List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); + + final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, + clusterState, followedIndices); + if (leaderIndicesToFollow.isEmpty()) { + finalise(slot, new AutoFollowResult(autoFollowPatternName)); + } else { + List> patternsForTheSameRemoteCluster = autoFollowMetadata.getPatterns() + .entrySet().stream() + .filter(item -> autoFollowPatternName.equals(item.getKey()) == false) + .filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster())) + .map(item -> new Tuple<>(item.getKey(), item.getValue())) + .collect(Collectors.toList()); + + Consumer resultHandler = result -> finalise(slot, result); + checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers, + patternsForTheSameRemoteCluster, resultHandler); + } + i++; + } + cleanFollowedRemoteIndices(remoteClusterState, patterns); + } + private void checkAutoFollowPattern(String autoFollowPattenName, - String leaderCluster, + String remoteCluster, AutoFollowPattern autoFollowPattern, List leaderIndicesToFollow, Map headers, - List> patternsForTheSameLeaderCluster, + List> patternsForTheSameRemoteCluster, Consumer resultHandler) { final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); @@ -327,7 +336,7 @@ public class AutoFollowCoordinator implements ClusterStateListener { final Index indexToFollow = leaderIndicesToFollow.get(i); final int slot = i; - List otherMatchingPatterns = patternsForTheSameLeaderCluster.stream() + List otherMatchingPatterns = patternsForTheSameRemoteCluster.stream() .filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName())) .map(Tuple::v1) .collect(Collectors.toList()); @@ -338,14 +347,13 @@ public class AutoFollowCoordinator implements ClusterStateListener { resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); } } else { - followLeaderIndex(autoFollowPattenName, leaderCluster, indexToFollow, autoFollowPattern, headers, error -> { + followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> { results.set(slot, new Tuple<>(indexToFollow, error)); if (leaderIndicesCountDown.countDown()) { resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); } }); } - } } @@ -395,18 +403,18 @@ public class AutoFollowCoordinator implements ClusterStateListener { if (autoFollowPatternsCountDown.countDown()) { statsUpdater.accept(autoFollowResults.asList()); // TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion: - threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices); + threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::start); } } static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, - ClusterState leaderClusterState, + ClusterState remoteClusterState, ClusterState followerClusterState, List followedIndexUUIDs) { List leaderIndicesToFollow = new ArrayList<>(); - for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) { + for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) { if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { - IndexRoutingTable indexRoutingTable = leaderClusterState.routingTable().index(leaderIndexMetaData.getIndex()); + IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex()); if (indexRoutingTable != null && // Leader indices can be in the cluster state, but not all primary shards may be ready yet. // This checks ensures all primary shards have started, so that index following does not fail. @@ -465,12 +473,63 @@ public class AutoFollowCoordinator implements ClusterStateListener { }; } + void cleanFollowedRemoteIndices(final ClusterState remoteClusterState, final List patterns) { + updateAutoFollowMetadata(cleanFollowedRemoteIndices(remoteClusterState.metaData(), patterns), e -> { + if (e != null) { + LOGGER.warn("Error occured while cleaning followed leader indices", e); + } + }); + } + + static Function cleanFollowedRemoteIndices( + final MetaData remoteMetadata, final List autoFollowPatternNames) { + return currentState -> { + AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); + Map> autoFollowPatternNameToFollowedIndexUUIDs = + new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs()); + Set remoteIndexUUIDS = new HashSet<>(); + remoteMetadata.getIndices().values() + .forEach((ObjectPredicate) value -> remoteIndexUUIDS.add(value.getIndexUUID())); + + boolean requiresCSUpdate = false; + for (String autoFollowPatternName : autoFollowPatternNames) { + if (autoFollowPatternNameToFollowedIndexUUIDs.containsKey(autoFollowPatternName) == false) { + // A delete auto follow pattern request can have removed the auto follow pattern while we want to update + // the auto follow metadata with the fact that an index was successfully auto followed. If this + // happens, we can just skip this step. + continue; + } + + List followedIndexUUIDs = + new ArrayList<>(autoFollowPatternNameToFollowedIndexUUIDs.get(autoFollowPatternName)); + // Remove leader indices that no longer exist in the remote cluster: + boolean entriesRemoved = followedIndexUUIDs.removeIf( + followedLeaderIndexUUID -> remoteIndexUUIDS.contains(followedLeaderIndexUUID) == false); + if (entriesRemoved) { + requiresCSUpdate = true; + } + autoFollowPatternNameToFollowedIndexUUIDs.put(autoFollowPatternName, followedIndexUUIDs); + } + + if (requiresCSUpdate) { + final AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(), + autoFollowPatternNameToFollowedIndexUUIDs, currentAutoFollowMetadata.getHeaders()); + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata).build()) + .build(); + } else { + return currentState; + } + }; + } + /** * Fetch the cluster state from the leader with the specified cluster alias * @param remoteCluster the name of the leader cluster * @param handler the callback to invoke */ - abstract void getLeaderClusterState( + abstract void getRemoteClusterState( String remoteCluster, BiConsumer handler ); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 9f22dc320cb..8932a6e6905 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -18,6 +19,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.ShardFollowTask; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; @@ -25,7 +27,9 @@ import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; @@ -76,6 +80,46 @@ public class AutoFollowIT extends CcrIntegTestCase { assertFalse(followerClient().admin().indices().exists(request).actionGet().isExists()); } + public void testCleanFollowedLeaderIndexUUIDs() throws Exception { + Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + putAutoFollowPatterns("my-pattern", new String[] {"logs-*"}); + createLeaderIndex("logs-201901", leaderIndexSettings); + assertBusy(() -> { + AutoFollowStats autoFollowStats = getAutoFollowStats(); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L)); + + IndicesExistsRequest request = new IndicesExistsRequest("copy-logs-201901"); + assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists()); + + MetaData metaData = getFollowerCluster().clusterService().state().metaData(); + String leaderIndexUUID = metaData.index("copy-logs-201901") + .getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) + .get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); + AutoFollowMetadata autoFollowMetadata = metaData.custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata, notNullValue()); + List followedLeaderIndixUUIDs = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("my-pattern"); + assertThat(followedLeaderIndixUUIDs.size(), equalTo(1)); + assertThat(followedLeaderIndixUUIDs.get(0), equalTo(leaderIndexUUID)); + }); + + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("logs-201901"); + assertAcked(leaderClient().admin().indices().delete(deleteIndexRequest).actionGet()); + + assertBusy(() -> { + AutoFollowMetadata autoFollowMetadata = getFollowerCluster().clusterService().state() + .metaData() + .custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata, notNullValue()); + List followedLeaderIndixUUIDs = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("my-pattern"); + assertThat(followedLeaderIndixUUIDs.size(), equalTo(0)); + }); + } + public void testAutoFollowManyIndices() throws Exception { Settings leaderIndexSettings = Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 7b296524676..c77402d5859 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -43,6 +43,7 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -62,7 +63,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); - ClusterState leaderState = createRemoteClusterState("logs-20190101"); + ClusterState remoteState = createRemoteClusterState("logs-20190101"); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null, null, null); @@ -89,12 +90,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), nullValue()); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(currentState)) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(currentState)) { @Override - void getLeaderClusterState(String remoteCluster, + void getRemoteClusterState(String remoteCluster, BiConsumer handler) { assertThat(remoteCluster, equalTo("remote")); - handler.accept(leaderState, null); + handler.accept(remoteState, null); } @Override @@ -118,8 +119,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1)); handler.accept(null); } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } }; - autoFollower.autoFollowIndices(); + autoFollower.start(); assertThat(invoked[0], is(true)); } @@ -136,7 +142,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); Map> headers = new HashMap<>(); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); - ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .build(); @@ -149,9 +155,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { @Override - void getLeaderClusterState(String remoteCluster, + void getRemoteClusterState(String remoteCluster, BiConsumer handler) { handler.accept(null, failure); } @@ -170,7 +176,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { fail("should not get here"); } }; - autoFollower.autoFollowIndices(); + autoFollower.start(); assertThat(invoked[0], is(true)); } @@ -178,7 +184,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { Client client = mock(Client.class); ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); - ClusterState leaderState = createRemoteClusterState("logs-20190101"); + ClusterState remoteState = createRemoteClusterState("logs-20190101"); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null, null, null); @@ -188,7 +194,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); Map> headers = new HashMap<>(); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); - ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .build(); @@ -204,11 +210,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { @Override - void getLeaderClusterState(String remoteCluster, + void getRemoteClusterState(String remoteCluster, BiConsumer handler) { - handler.accept(leaderState, null); + handler.accept(remoteState, null); } @Override @@ -227,7 +233,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { handler.accept(failure); } }; - autoFollower.autoFollowIndices(); + autoFollower.start(); assertThat(invoked[0], is(true)); } @@ -235,7 +241,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { Client client = mock(Client.class); ThreadPool threadPool = mockThreadPool(); when(client.getRemoteClusterClient(anyString())).thenReturn(client); - ClusterState leaderState = createRemoteClusterState("logs-20190101"); + ClusterState remoteState = createRemoteClusterState("logs-20190101"); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), null, null, null, null, null, null, null, null, null, null, null); @@ -245,7 +251,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); Map> headers = new HashMap<>(); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); - ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .build(); @@ -261,11 +267,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getValue(), sameInstance(failure)); }; - AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { + AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) { @Override - void getLeaderClusterState(String remoteCluster, + void getRemoteClusterState(String remoteCluster, BiConsumer handler) { - handler.accept(leaderState, null); + handler.accept(remoteState, null); } @Override @@ -284,8 +290,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase { Consumer handler) { fail("should not get here"); } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) + } }; - autoFollower.autoFollowIndices(); + autoFollower.start(); assertThat(invoked[0], is(true)); } @@ -293,7 +304,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null, null, null); Map> headers = new HashMap<>(); - ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) .build(); @@ -328,12 +339,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase { IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(imdBuilder.get("logs-0").getIndex()).addShard(shardRouting).build(); routingTableBuilder.add(indexRoutingTable); - ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) + ClusterState remoteState = ClusterState.builder(new ClusterName("remote")) .metaData(imdBuilder) .routingTable(routingTableBuilder.build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, Collections.emptyList()); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(3)); @@ -341,8 +352,8 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result.get(1).getName(), equalTo("metrics-2")); assertThat(result.get(2).getName(), equalTo("metrics-4")); - List followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID()); - result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs); + List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(2)); assertThat(result.get(0).getName(), equalTo("metrics-0")); @@ -353,14 +364,14 @@ public class AutoFollowCoordinatorTests extends ESTestCase { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), null, null, null, null, null, null, null, null, null, null, null); Map> headers = new HashMap<>(); - ClusterState followerState = ClusterState.builder(new ClusterName("remote")) + ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) .build(); // 1 shard started and another not started: - ClusterState leaderState = createRemoteClusterState("index1"); - MetaData.Builder mBuilder= MetaData.builder(leaderState.metaData()); + ClusterState remoteState = createRemoteClusterState("index1"); + MetaData.Builder mBuilder= MetaData.builder(remoteState.metaData()); mBuilder.put(IndexMetaData.builder("index2") .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) .numberOfShards(1) @@ -369,26 +380,26 @@ public class AutoFollowCoordinatorTests extends ESTestCase { TestShardRouting.newShardRouting("index2", 0, "1", true, ShardRoutingState.INITIALIZING); IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(mBuilder.get("index2").getIndex() ).addShard(shardRouting).build(); - leaderState = ClusterState.builder(leaderState.getClusterName()) + remoteState = ClusterState.builder(remoteState.getClusterName()) .metaData(mBuilder) - .routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build()) + .routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build()) .build(); - List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, + List result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, Collections.emptyList()); assertThat(result.size(), equalTo(1)); assertThat(result.get(0).getName(), equalTo("index1")); // Start second shard: shardRouting = shardRouting.moveToStarted(); - indexRoutingTable = IndexRoutingTable.builder(leaderState.metaData().indices().get("index2").getIndex()) + indexRoutingTable = IndexRoutingTable.builder(remoteState.metaData().indices().get("index2").getIndex()) .addShard(shardRouting).build(); - leaderState = ClusterState.builder(leaderState.getClusterName()) - .metaData(leaderState.metaData()) - .routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build()) + remoteState = ClusterState.builder(remoteState.getClusterName()) + .metaData(remoteState.metaData()) + .routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build()) .build(); - result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); + result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, Collections.emptyList()); assertThat(result.size(), equalTo(2)); result.sort(Comparator.comparing(Index::getName)); assertThat(result.get(0).getName(), equalTo("index1")); @@ -422,6 +433,87 @@ public class AutoFollowCoordinatorTests extends ESTestCase { assertThat(result, sameInstance(clusterState)); } + public void testCleanFollowedLeaderIndices() { + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), + Collections.singletonMap("pattern1", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap()); + ClusterState clusterState = new ClusterState.Builder(new ClusterName("name")) + .metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + MetaData remoteMetadata = new MetaData.Builder() + .put(IndexMetaData.builder("index1") + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, "index1")) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetaData.builder("index3") + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, "index3")) + .numberOfShards(1) + .numberOfReplicas(0)) + .build(); + + Function function = cleanFollowedRemoteIndices(remoteMetadata, Collections.singletonList("pattern1")); + AutoFollowMetadata result = function.apply(clusterState).metaData().custom(AutoFollowMetadata.TYPE); + assertThat(result.getFollowedLeaderIndexUUIDs().get("pattern1").size(), equalTo(2)); + assertThat(result.getFollowedLeaderIndexUUIDs().get("pattern1").get(0), equalTo("index1")); + assertThat(result.getFollowedLeaderIndexUUIDs().get("pattern1").get(1), equalTo("index3")); + } + + public void testCleanFollowedLeaderIndicesNoChanges() { + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), + Collections.singletonMap("pattern1", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap()); + ClusterState clusterState = new ClusterState.Builder(new ClusterName("name")) + .metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + MetaData remoteMetadata = new MetaData.Builder() + .put(IndexMetaData.builder("index1") + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, "index1")) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetaData.builder("index2") + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, "index2")) + .numberOfShards(1) + .numberOfReplicas(0)) + .put(IndexMetaData.builder("index3") + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, "index3")) + .numberOfShards(1) + .numberOfReplicas(0)) + .build(); + + Function function = cleanFollowedRemoteIndices(remoteMetadata, Collections.singletonList("pattern1")); + ClusterState result = function.apply(clusterState); + assertThat(result, sameInstance(clusterState)); + } + + public void testCleanFollowedLeaderIndicesNoEntry() { + AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), + Collections.singletonMap("pattern2", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap()); + ClusterState clusterState = new ClusterState.Builder(new ClusterName("name")) + .metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) + .build(); + + MetaData remoteMetadata = new MetaData.Builder() + .put(IndexMetaData.builder("index1") + .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0)) + .build(); + + Function function = cleanFollowedRemoteIndices(remoteMetadata, Collections.singletonList("pattern1")); + ClusterState result = function.apply(clusterState); + assertThat(result, sameInstance(clusterState)); + } + public void testGetFollowerIndexName() { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, null, null, null, null, null, null, null, null, null); @@ -511,7 +603,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { return csBuilder.build(); } - private static Supplier followerClusterStateSupplier(ClusterState... states) { + private static Supplier localClusterStateSupplier(ClusterState... states) { final AutoFollowMetadata emptyAutoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); final ClusterState lastState = ClusterState.builder(new ClusterName("remote")) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index 379dbe7a421..6b651444f2d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -165,12 +165,14 @@ public class AutoFollowMetadata extends AbstractNamedDiffable i if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; AutoFollowMetadata that = (AutoFollowMetadata) o; - return Objects.equals(patterns, that.patterns); + return Objects.equals(patterns, that.patterns) && + Objects.equals(followedLeaderIndexUUIDs, that.followedLeaderIndexUUIDs) && + Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(patterns); + return Objects.hash(patterns, followedLeaderIndexUUIDs, headers); } public static class AutoFollowPattern implements Writeable, ToXContentObject {