[CCR] Clean followed leader index UUIDs in auto follow metadata (#36408)

The auto follow coordinator keeps track of the UUIDs of indices that it has followed. The index UUID strings need to be cleaned up in the case that these indices are removed in the remote cluster.

Relates to #33007
This commit is contained in:
Martijn van Groningen 2018-12-12 09:55:37 +01:00 committed by GitHub
parent c4f4378006
commit 4a825e2e86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 286 additions and 89 deletions

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ccr.action; package org.elasticsearch.xpack.ccr.action;
import com.carrotsearch.hppc.predicates.ObjectPredicate;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
@ -42,6 +43,7 @@ import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -151,7 +153,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) { AutoFollower autoFollower = new AutoFollower(remoteCluster, threadPool, this::updateStats, clusterService::state) {
@Override @Override
void getLeaderClusterState(final String remoteCluster, void getRemoteClusterState(final String remoteCluster,
final BiConsumer<ClusterState, Exception> handler) { final BiConsumer<ClusterState, Exception> handler) {
final ClusterStateRequest request = new ClusterStateRequest(); final ClusterStateRequest request = new ClusterStateRequest();
request.clear(); request.clear();
@ -163,7 +165,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
remoteCluster, remoteCluster,
request, request,
e -> handler.accept(null, e), e -> handler.accept(null, e),
leaderClusterState -> handler.accept(leaderClusterState, null)); remoteClusterState -> handler.accept(remoteClusterState, null));
} }
@Override @Override
@ -203,7 +205,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
}; };
newAutoFollowers.put(remoteCluster, autoFollower); newAutoFollowers.put(remoteCluster, autoFollower);
autoFollower.autoFollowIndices(); autoFollower.start();
} }
List<String> removedRemoteClusters = new ArrayList<>(); List<String> removedRemoteClusters = new ArrayList<>();
@ -254,9 +256,9 @@ public class AutoFollowCoordinator implements ClusterStateListener {
this.followerClusterStateSupplier = followerClusterStateSupplier; this.followerClusterStateSupplier = followerClusterStateSupplier;
} }
void autoFollowIndices() { void start() {
final ClusterState followerClusterState = followerClusterStateSupplier.get(); final ClusterState clusterState = followerClusterStateSupplier.get();
final AutoFollowMetadata autoFollowMetadata = followerClusterState.metaData().custom(AutoFollowMetadata.TYPE); final AutoFollowMetadata autoFollowMetadata = clusterState.metaData().custom(AutoFollowMetadata.TYPE);
if (autoFollowMetadata == null) { if (autoFollowMetadata == null) {
LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster); LOGGER.info("AutoFollower for cluster [{}] has stopped, because there is no autofollow metadata", remoteCluster);
return; return;
@ -274,51 +276,58 @@ public class AutoFollowCoordinator implements ClusterStateListener {
this.autoFollowPatternsCountDown = new CountDown(patterns.size()); this.autoFollowPatternsCountDown = new CountDown(patterns.size());
this.autoFollowResults = new AtomicArray<>(patterns.size()); this.autoFollowResults = new AtomicArray<>(patterns.size());
getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> { getRemoteClusterState(remoteCluster, (remoteClusterState, remoteError) -> {
if (leaderClusterState != null) { if (remoteClusterState != null) {
assert e == null; assert remoteError == null;
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
int i = 0;
for (String autoFollowPatternName : patterns) {
final int slot = i;
AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName);
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, leaderClusterState,
followerClusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(autoFollowPatternName));
} else {
List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster = autoFollowMetadata.getPatterns()
.entrySet().stream()
.filter(item -> autoFollowPatternName.equals(item.getKey()) == false)
.filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster()))
.map(item -> new Tuple<>(item.getKey(), item.getValue()))
.collect(Collectors.toList());
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
patternsForTheSameLeaderCluster, resultHandler);
}
i++;
}
} else { } else {
List<AutoFollowResult> results = new ArrayList<>(patterns.size()); assert remoteError != null;
for (String autoFollowPatternName : patterns) { for (int i = 0; i < patterns.size(); i++) {
results.add(new AutoFollowResult(autoFollowPatternName, e)); String autoFollowPatternName = patterns.get(i);
finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError));
} }
statsUpdater.accept(results);
} }
}); });
} }
private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
final ClusterState clusterState,
final ClusterState remoteClusterState,
final List<String> patterns) {
int i = 0;
for (String autoFollowPatternName : patterns) {
final int slot = i;
AutoFollowPattern autoFollowPattern = autoFollowMetadata.getPatterns().get(autoFollowPatternName);
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState,
clusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(autoFollowPatternName));
} else {
List<Tuple<String, AutoFollowPattern>> patternsForTheSameRemoteCluster = autoFollowMetadata.getPatterns()
.entrySet().stream()
.filter(item -> autoFollowPatternName.equals(item.getKey()) == false)
.filter(item -> remoteCluster.equals(item.getValue().getRemoteCluster()))
.map(item -> new Tuple<>(item.getKey(), item.getValue()))
.collect(Collectors.toList());
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
patternsForTheSameRemoteCluster, resultHandler);
}
i++;
}
cleanFollowedRemoteIndices(remoteClusterState, patterns);
}
private void checkAutoFollowPattern(String autoFollowPattenName, private void checkAutoFollowPattern(String autoFollowPattenName,
String leaderCluster, String remoteCluster,
AutoFollowPattern autoFollowPattern, AutoFollowPattern autoFollowPattern,
List<Index> leaderIndicesToFollow, List<Index> leaderIndicesToFollow,
Map<String, String> headers, Map<String, String> headers,
List<Tuple<String, AutoFollowPattern>> patternsForTheSameLeaderCluster, List<Tuple<String, AutoFollowPattern>> patternsForTheSameRemoteCluster,
Consumer<AutoFollowResult> resultHandler) { Consumer<AutoFollowResult> resultHandler) {
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size()); final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
@ -327,7 +336,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
final Index indexToFollow = leaderIndicesToFollow.get(i); final Index indexToFollow = leaderIndicesToFollow.get(i);
final int slot = i; final int slot = i;
List<String> otherMatchingPatterns = patternsForTheSameLeaderCluster.stream() List<String> otherMatchingPatterns = patternsForTheSameRemoteCluster.stream()
.filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName())) .filter(otherPattern -> otherPattern.v2().match(indexToFollow.getName()))
.map(Tuple::v1) .map(Tuple::v1)
.collect(Collectors.toList()); .collect(Collectors.toList());
@ -338,14 +347,13 @@ public class AutoFollowCoordinator implements ClusterStateListener {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
} }
} else { } else {
followLeaderIndex(autoFollowPattenName, leaderCluster, indexToFollow, autoFollowPattern, headers, error -> { followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error)); results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) { if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList())); resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
} }
}); });
} }
} }
} }
@ -395,18 +403,18 @@ public class AutoFollowCoordinator implements ClusterStateListener {
if (autoFollowPatternsCountDown.countDown()) { if (autoFollowPatternsCountDown.countDown()) {
statsUpdater.accept(autoFollowResults.asList()); statsUpdater.accept(autoFollowResults.asList());
// TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion: // TODO: Remove scheduling here with using cluster state API's waitForMetadataVersion:
threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::autoFollowIndices); threadPool.schedule(TimeValue.timeValueMillis(2500), ThreadPool.Names.GENERIC, this::start);
} }
} }
static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
ClusterState leaderClusterState, ClusterState remoteClusterState,
ClusterState followerClusterState, ClusterState followerClusterState,
List<String> followedIndexUUIDs) { List<String> followedIndexUUIDs) {
List<Index> leaderIndicesToFollow = new ArrayList<>(); List<Index> leaderIndicesToFollow = new ArrayList<>();
for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) { for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) {
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
IndexRoutingTable indexRoutingTable = leaderClusterState.routingTable().index(leaderIndexMetaData.getIndex()); IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex());
if (indexRoutingTable != null && if (indexRoutingTable != null &&
// Leader indices can be in the cluster state, but not all primary shards may be ready yet. // Leader indices can be in the cluster state, but not all primary shards may be ready yet.
// This checks ensures all primary shards have started, so that index following does not fail. // This checks ensures all primary shards have started, so that index following does not fail.
@ -465,12 +473,63 @@ public class AutoFollowCoordinator implements ClusterStateListener {
}; };
} }
void cleanFollowedRemoteIndices(final ClusterState remoteClusterState, final List<String> patterns) {
updateAutoFollowMetadata(cleanFollowedRemoteIndices(remoteClusterState.metaData(), patterns), e -> {
if (e != null) {
LOGGER.warn("Error occured while cleaning followed leader indices", e);
}
});
}
static Function<ClusterState, ClusterState> cleanFollowedRemoteIndices(
final MetaData remoteMetadata, final List<String> autoFollowPatternNames) {
return currentState -> {
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
Map<String, List<String>> autoFollowPatternNameToFollowedIndexUUIDs =
new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
Set<String> remoteIndexUUIDS = new HashSet<>();
remoteMetadata.getIndices().values()
.forEach((ObjectPredicate<IndexMetaData>) value -> remoteIndexUUIDS.add(value.getIndexUUID()));
boolean requiresCSUpdate = false;
for (String autoFollowPatternName : autoFollowPatternNames) {
if (autoFollowPatternNameToFollowedIndexUUIDs.containsKey(autoFollowPatternName) == false) {
// A delete auto follow pattern request can have removed the auto follow pattern while we want to update
// the auto follow metadata with the fact that an index was successfully auto followed. If this
// happens, we can just skip this step.
continue;
}
List<String> followedIndexUUIDs =
new ArrayList<>(autoFollowPatternNameToFollowedIndexUUIDs.get(autoFollowPatternName));
// Remove leader indices that no longer exist in the remote cluster:
boolean entriesRemoved = followedIndexUUIDs.removeIf(
followedLeaderIndexUUID -> remoteIndexUUIDS.contains(followedLeaderIndexUUID) == false);
if (entriesRemoved) {
requiresCSUpdate = true;
}
autoFollowPatternNameToFollowedIndexUUIDs.put(autoFollowPatternName, followedIndexUUIDs);
}
if (requiresCSUpdate) {
final AutoFollowMetadata newAutoFollowMetadata = new AutoFollowMetadata(currentAutoFollowMetadata.getPatterns(),
autoFollowPatternNameToFollowedIndexUUIDs, currentAutoFollowMetadata.getHeaders());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(AutoFollowMetadata.TYPE, newAutoFollowMetadata).build())
.build();
} else {
return currentState;
}
};
}
/** /**
* Fetch the cluster state from the leader with the specified cluster alias * Fetch the cluster state from the leader with the specified cluster alias
* @param remoteCluster the name of the leader cluster * @param remoteCluster the name of the leader cluster
* @param handler the callback to invoke * @param handler the callback to invoke
*/ */
abstract void getLeaderClusterState( abstract void getRemoteClusterState(
String remoteCluster, String remoteCluster,
BiConsumer<ClusterState, Exception> handler BiConsumer<ClusterState, Exception> handler
); );

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ccr;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
@ -18,6 +19,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
@ -25,7 +27,9 @@ import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
@ -76,6 +80,46 @@ public class AutoFollowIT extends CcrIntegTestCase {
assertFalse(followerClient().admin().indices().exists(request).actionGet().isExists()); assertFalse(followerClient().admin().indices().exists(request).actionGet().isExists());
} }
public void testCleanFollowedLeaderIndexUUIDs() throws Exception {
Settings leaderIndexSettings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build();
putAutoFollowPatterns("my-pattern", new String[] {"logs-*"});
createLeaderIndex("logs-201901", leaderIndexSettings);
assertBusy(() -> {
AutoFollowStats autoFollowStats = getAutoFollowStats();
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L));
IndicesExistsRequest request = new IndicesExistsRequest("copy-logs-201901");
assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists());
MetaData metaData = getFollowerCluster().clusterService().state().metaData();
String leaderIndexUUID = metaData.index("copy-logs-201901")
.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)
.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
AutoFollowMetadata autoFollowMetadata = metaData.custom(AutoFollowMetadata.TYPE);
assertThat(autoFollowMetadata, notNullValue());
List<String> followedLeaderIndixUUIDs = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("my-pattern");
assertThat(followedLeaderIndixUUIDs.size(), equalTo(1));
assertThat(followedLeaderIndixUUIDs.get(0), equalTo(leaderIndexUUID));
});
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("logs-201901");
assertAcked(leaderClient().admin().indices().delete(deleteIndexRequest).actionGet());
assertBusy(() -> {
AutoFollowMetadata autoFollowMetadata = getFollowerCluster().clusterService().state()
.metaData()
.custom(AutoFollowMetadata.TYPE);
assertThat(autoFollowMetadata, notNullValue());
List<String> followedLeaderIndixUUIDs = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("my-pattern");
assertThat(followedLeaderIndixUUIDs.size(), equalTo(0));
});
}
public void testAutoFollowManyIndices() throws Exception { public void testAutoFollowManyIndices() throws Exception {
Settings leaderIndexSettings = Settings.builder() Settings leaderIndexSettings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)

View File

@ -43,6 +43,7 @@ import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices;
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -62,7 +63,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
ThreadPool threadPool = mockThreadPool(); ThreadPool threadPool = mockThreadPool();
when(client.getRemoteClusterClient(anyString())).thenReturn(client); when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState leaderState = createRemoteClusterState("logs-20190101"); ClusterState remoteState = createRemoteClusterState("logs-20190101");
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, null, null, null, null, null, null, null, null, null, null); null, null, null, null, null, null, null, null, null, null, null);
@ -89,12 +90,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), nullValue()); assertThat(entries.get(0).getValue(), nullValue());
}; };
AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(currentState)) { AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(currentState)) {
@Override @Override
void getLeaderClusterState(String remoteCluster, void getRemoteClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) { BiConsumer<ClusterState, Exception> handler) {
assertThat(remoteCluster, equalTo("remote")); assertThat(remoteCluster, equalTo("remote"));
handler.accept(leaderState, null); handler.accept(remoteState, null);
} }
@Override @Override
@ -118,8 +119,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1)); assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1));
handler.accept(null); handler.accept(null);
} }
@Override
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
// Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
}
}; };
autoFollower.autoFollowIndices(); autoFollower.start();
assertThat(invoked[0], is(true)); assertThat(invoked[0], is(true));
} }
@ -136,7 +142,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
Map<String, Map<String, String>> headers = new HashMap<>(); Map<String, Map<String, String>> headers = new HashMap<>();
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers);
ClusterState followerState = ClusterState.builder(new ClusterName("remote")) ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build(); .build();
@ -149,9 +155,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(results.get(0).clusterStateFetchException, sameInstance(failure)); assertThat(results.get(0).clusterStateFetchException, sameInstance(failure));
assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0)); assertThat(results.get(0).autoFollowExecutionResults.entrySet().size(), equalTo(0));
}; };
AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) {
@Override @Override
void getLeaderClusterState(String remoteCluster, void getRemoteClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) { BiConsumer<ClusterState, Exception> handler) {
handler.accept(null, failure); handler.accept(null, failure);
} }
@ -170,7 +176,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
fail("should not get here"); fail("should not get here");
} }
}; };
autoFollower.autoFollowIndices(); autoFollower.start();
assertThat(invoked[0], is(true)); assertThat(invoked[0], is(true));
} }
@ -178,7 +184,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Client client = mock(Client.class); Client client = mock(Client.class);
ThreadPool threadPool = mockThreadPool(); ThreadPool threadPool = mockThreadPool();
when(client.getRemoteClusterClient(anyString())).thenReturn(client); when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState leaderState = createRemoteClusterState("logs-20190101"); ClusterState remoteState = createRemoteClusterState("logs-20190101");
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, null, null, null, null, null, null, null, null, null, null); null, null, null, null, null, null, null, null, null, null, null);
@ -188,7 +194,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
Map<String, Map<String, String>> headers = new HashMap<>(); Map<String, Map<String, String>> headers = new HashMap<>();
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers);
ClusterState followerState = ClusterState.builder(new ClusterName("remote")) ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build(); .build();
@ -204,11 +210,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), sameInstance(failure)); assertThat(entries.get(0).getValue(), sameInstance(failure));
}; };
AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) {
@Override @Override
void getLeaderClusterState(String remoteCluster, void getRemoteClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) { BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null); handler.accept(remoteState, null);
} }
@Override @Override
@ -227,7 +233,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
handler.accept(failure); handler.accept(failure);
} }
}; };
autoFollower.autoFollowIndices(); autoFollower.start();
assertThat(invoked[0], is(true)); assertThat(invoked[0], is(true));
} }
@ -235,7 +241,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Client client = mock(Client.class); Client client = mock(Client.class);
ThreadPool threadPool = mockThreadPool(); ThreadPool threadPool = mockThreadPool();
when(client.getRemoteClusterClient(anyString())).thenReturn(client); when(client.getRemoteClusterClient(anyString())).thenReturn(client);
ClusterState leaderState = createRemoteClusterState("logs-20190101"); ClusterState remoteState = createRemoteClusterState("logs-20190101");
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
null, null, null, null, null, null, null, null, null, null, null); null, null, null, null, null, null, null, null, null, null, null);
@ -245,7 +251,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
followedLeaderIndexUUIDS.put("remote", new ArrayList<>()); followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
Map<String, Map<String, String>> headers = new HashMap<>(); Map<String, Map<String, String>> headers = new HashMap<>();
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, headers);
ClusterState followerState = ClusterState.builder(new ClusterName("remote")) ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata)) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build(); .build();
@ -261,11 +267,11 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101")); assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
assertThat(entries.get(0).getValue(), sameInstance(failure)); assertThat(entries.get(0).getValue(), sameInstance(failure));
}; };
AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, followerClusterStateSupplier(followerState)) { AutoFollower autoFollower = new AutoFollower("remote", threadPool, handler, localClusterStateSupplier(clusterState)) {
@Override @Override
void getLeaderClusterState(String remoteCluster, void getRemoteClusterState(String remoteCluster,
BiConsumer<ClusterState, Exception> handler) { BiConsumer<ClusterState, Exception> handler) {
handler.accept(leaderState, null); handler.accept(remoteState, null);
} }
@Override @Override
@ -284,8 +290,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
Consumer<Exception> handler) { Consumer<Exception> handler) {
fail("should not get here"); fail("should not get here");
} }
@Override
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
// Ignore, to avoid invoking updateAutoFollowMetadata(...)
}
}; };
autoFollower.autoFollowIndices(); autoFollower.start();
assertThat(invoked[0], is(true)); assertThat(invoked[0], is(true));
} }
@ -293,7 +304,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"),
null, null, null, null, null, null, null, null, null, null, null); null, null, null, null, null, null, null, null, null, null, null);
Map<String, Map<String, String>> headers = new HashMap<>(); Map<String, Map<String, String>> headers = new HashMap<>();
ClusterState followerState = ClusterState.builder(new ClusterName("remote")) ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
.build(); .build();
@ -328,12 +339,12 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(imdBuilder.get("logs-0").getIndex()).addShard(shardRouting).build(); IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(imdBuilder.get("logs-0").getIndex()).addShard(shardRouting).build();
routingTableBuilder.add(indexRoutingTable); routingTableBuilder.add(indexRoutingTable);
ClusterState leaderState = ClusterState.builder(new ClusterName("remote")) ClusterState remoteState = ClusterState.builder(new ClusterName("remote"))
.metaData(imdBuilder) .metaData(imdBuilder)
.routingTable(routingTableBuilder.build()) .routingTable(routingTableBuilder.build())
.build(); .build();
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState,
Collections.emptyList()); Collections.emptyList());
result.sort(Comparator.comparing(Index::getName)); result.sort(Comparator.comparing(Index::getName));
assertThat(result.size(), equalTo(3)); assertThat(result.size(), equalTo(3));
@ -341,8 +352,8 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(result.get(1).getName(), equalTo("metrics-2")); assertThat(result.get(1).getName(), equalTo("metrics-2"));
assertThat(result.get(2).getName(), equalTo("metrics-4")); assertThat(result.get(2).getName(), equalTo("metrics-4"));
List<String> followedIndexUUIDs = Collections.singletonList(leaderState.metaData().index("metrics-2").getIndexUUID()); List<String> followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID());
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, followedIndexUUIDs); result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, followedIndexUUIDs);
result.sort(Comparator.comparing(Index::getName)); result.sort(Comparator.comparing(Index::getName));
assertThat(result.size(), equalTo(2)); assertThat(result.size(), equalTo(2));
assertThat(result.get(0).getName(), equalTo("metrics-0")); assertThat(result.get(0).getName(), equalTo("metrics-0"));
@ -353,14 +364,14 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"),
null, null, null, null, null, null, null, null, null, null, null); null, null, null, null, null, null, null, null, null, null, null);
Map<String, Map<String, String>> headers = new HashMap<>(); Map<String, Map<String, String>> headers = new HashMap<>();
ClusterState followerState = ClusterState.builder(new ClusterName("remote")) ClusterState clusterState = ClusterState.builder(new ClusterName("remote"))
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
.build(); .build();
// 1 shard started and another not started: // 1 shard started and another not started:
ClusterState leaderState = createRemoteClusterState("index1"); ClusterState remoteState = createRemoteClusterState("index1");
MetaData.Builder mBuilder= MetaData.builder(leaderState.metaData()); MetaData.Builder mBuilder= MetaData.builder(remoteState.metaData());
mBuilder.put(IndexMetaData.builder("index2") mBuilder.put(IndexMetaData.builder("index2")
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) .settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
.numberOfShards(1) .numberOfShards(1)
@ -369,26 +380,26 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
TestShardRouting.newShardRouting("index2", 0, "1", true, ShardRoutingState.INITIALIZING); TestShardRouting.newShardRouting("index2", 0, "1", true, ShardRoutingState.INITIALIZING);
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(mBuilder.get("index2").getIndex() IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(mBuilder.get("index2").getIndex()
).addShard(shardRouting).build(); ).addShard(shardRouting).build();
leaderState = ClusterState.builder(leaderState.getClusterName()) remoteState = ClusterState.builder(remoteState.getClusterName())
.metaData(mBuilder) .metaData(mBuilder)
.routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build()) .routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build())
.build(); .build();
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState,
Collections.emptyList()); Collections.emptyList());
assertThat(result.size(), equalTo(1)); assertThat(result.size(), equalTo(1));
assertThat(result.get(0).getName(), equalTo("index1")); assertThat(result.get(0).getName(), equalTo("index1"));
// Start second shard: // Start second shard:
shardRouting = shardRouting.moveToStarted(); shardRouting = shardRouting.moveToStarted();
indexRoutingTable = IndexRoutingTable.builder(leaderState.metaData().indices().get("index2").getIndex()) indexRoutingTable = IndexRoutingTable.builder(remoteState.metaData().indices().get("index2").getIndex())
.addShard(shardRouting).build(); .addShard(shardRouting).build();
leaderState = ClusterState.builder(leaderState.getClusterName()) remoteState = ClusterState.builder(remoteState.getClusterName())
.metaData(leaderState.metaData()) .metaData(remoteState.metaData())
.routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build()) .routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build())
.build(); .build();
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, leaderState, followerState, Collections.emptyList()); result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, Collections.emptyList());
assertThat(result.size(), equalTo(2)); assertThat(result.size(), equalTo(2));
result.sort(Comparator.comparing(Index::getName)); result.sort(Comparator.comparing(Index::getName));
assertThat(result.get(0).getName(), equalTo("index1")); assertThat(result.get(0).getName(), equalTo("index1"));
@ -422,6 +433,87 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
assertThat(result, sameInstance(clusterState)); assertThat(result, sameInstance(clusterState));
} }
public void testCleanFollowedLeaderIndices() {
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
Collections.singletonMap("pattern1", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build();
MetaData remoteMetadata = new MetaData.Builder()
.put(IndexMetaData.builder("index1")
.settings(settings(Version.CURRENT)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_INDEX_UUID, "index1"))
.numberOfShards(1)
.numberOfReplicas(0))
.put(IndexMetaData.builder("index3")
.settings(settings(Version.CURRENT)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_INDEX_UUID, "index3"))
.numberOfShards(1)
.numberOfReplicas(0))
.build();
Function<ClusterState, ClusterState> function = cleanFollowedRemoteIndices(remoteMetadata, Collections.singletonList("pattern1"));
AutoFollowMetadata result = function.apply(clusterState).metaData().custom(AutoFollowMetadata.TYPE);
assertThat(result.getFollowedLeaderIndexUUIDs().get("pattern1").size(), equalTo(2));
assertThat(result.getFollowedLeaderIndexUUIDs().get("pattern1").get(0), equalTo("index1"));
assertThat(result.getFollowedLeaderIndexUUIDs().get("pattern1").get(1), equalTo("index3"));
}
public void testCleanFollowedLeaderIndicesNoChanges() {
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
Collections.singletonMap("pattern1", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build();
MetaData remoteMetadata = new MetaData.Builder()
.put(IndexMetaData.builder("index1")
.settings(settings(Version.CURRENT)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_INDEX_UUID, "index1"))
.numberOfShards(1)
.numberOfReplicas(0))
.put(IndexMetaData.builder("index2")
.settings(settings(Version.CURRENT)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_INDEX_UUID, "index2"))
.numberOfShards(1)
.numberOfReplicas(0))
.put(IndexMetaData.builder("index3")
.settings(settings(Version.CURRENT)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_INDEX_UUID, "index3"))
.numberOfShards(1)
.numberOfReplicas(0))
.build();
Function<ClusterState, ClusterState> function = cleanFollowedRemoteIndices(remoteMetadata, Collections.singletonList("pattern1"));
ClusterState result = function.apply(clusterState);
assertThat(result, sameInstance(clusterState));
}
public void testCleanFollowedLeaderIndicesNoEntry() {
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
Collections.singletonMap("pattern2", Arrays.asList("index1", "index2", "index3")), Collections.emptyMap());
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
.build();
MetaData remoteMetadata = new MetaData.Builder()
.put(IndexMetaData.builder("index1")
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0))
.build();
Function<ClusterState, ClusterState> function = cleanFollowedRemoteIndices(remoteMetadata, Collections.singletonList("pattern1"));
ClusterState result = function.apply(clusterState);
assertThat(result, sameInstance(clusterState));
}
public void testGetFollowerIndexName() { public void testGetFollowerIndexName() {
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null,
null, null, null, null, null, null, null, null, null); null, null, null, null, null, null, null, null, null);
@ -511,7 +603,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
return csBuilder.build(); return csBuilder.build();
} }
private static Supplier<ClusterState> followerClusterStateSupplier(ClusterState... states) { private static Supplier<ClusterState> localClusterStateSupplier(ClusterState... states) {
final AutoFollowMetadata emptyAutoFollowMetadata = final AutoFollowMetadata emptyAutoFollowMetadata =
new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
final ClusterState lastState = ClusterState.builder(new ClusterName("remote")) final ClusterState lastState = ClusterState.builder(new ClusterName("remote"))

View File

@ -165,12 +165,14 @@ public class AutoFollowMetadata extends AbstractNamedDiffable<MetaData.Custom> i
if (this == o) return true; if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
AutoFollowMetadata that = (AutoFollowMetadata) o; AutoFollowMetadata that = (AutoFollowMetadata) o;
return Objects.equals(patterns, that.patterns); return Objects.equals(patterns, that.patterns) &&
Objects.equals(followedLeaderIndexUUIDs, that.followedLeaderIndexUUIDs) &&
Objects.equals(headers, that.headers);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(patterns); return Objects.hash(patterns, followedLeaderIndexUUIDs, headers);
} }
public static class AutoFollowPattern implements Writeable, ToXContentObject { public static class AutoFollowPattern implements Writeable, ToXContentObject {