[CCR] AutoFollowCoordinator and follower index already created (#36540)
The AutoFollowCoordinator should be resilient to the fact that the follower index has already been created and in that case it should only update the auto follow metadata with the fact that the follower index was created. Relates to #33007
This commit is contained in:
parent
44fe265d82
commit
561b704129
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.util.concurrent.CountDown;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
|
||||
|
@ -365,8 +366,8 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
|
||||
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);
|
||||
|
||||
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState,
|
||||
clusterState, followedIndices);
|
||||
final List<Index> leaderIndicesToFollow =
|
||||
getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
|
||||
if (leaderIndicesToFollow.isEmpty()) {
|
||||
finalise(slot, new AutoFollowResult(autoFollowPatternName));
|
||||
} else {
|
||||
|
@ -379,7 +380,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||
|
||||
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
|
||||
checkAutoFollowPattern(autoFollowPatternName, remoteCluster, autoFollowPattern, leaderIndicesToFollow, headers,
|
||||
patternsForTheSameRemoteCluster, remoteClusterState.metaData(), resultHandler);
|
||||
patternsForTheSameRemoteCluster, remoteClusterState.metaData(), clusterState.metaData(), resultHandler);
|
||||
}
|
||||
i++;
|
||||
}
|
||||
|
@ -393,6 +394,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||
Map<String, String> headers,
|
||||
List<Tuple<String, AutoFollowPattern>> patternsForTheSameRemoteCluster,
|
||||
MetaData remoteMetadata,
|
||||
MetaData localMetadata,
|
||||
Consumer<AutoFollowResult> resultHandler) {
|
||||
|
||||
final CountDown leaderIndicesCountDown = new CountDown(leaderIndicesToFollow.size());
|
||||
|
@ -430,7 +432,16 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||
}
|
||||
});
|
||||
continue;
|
||||
} else if (leaderIndexAlreadyFollowed(autoFollowPattern, indexToFollow, localMetadata)) {
|
||||
updateAutoFollowMetadata(recordLeaderIndexAsFollowFunction(autoFollowPattenName, indexToFollow), error -> {
|
||||
results.set(slot, new Tuple<>(indexToFollow, error));
|
||||
if (leaderIndicesCountDown.countDown()) {
|
||||
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
|
||||
}
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
followLeaderIndex(autoFollowPattenName, remoteCluster, indexToFollow, autoFollowPattern, headers, error -> {
|
||||
results.set(slot, new Tuple<>(indexToFollow, error));
|
||||
if (leaderIndicesCountDown.countDown()) {
|
||||
|
@ -441,6 +452,25 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||
}
|
||||
}
|
||||
|
||||
private static boolean leaderIndexAlreadyFollowed(AutoFollowPattern autoFollowPattern,
|
||||
Index leaderIndex,
|
||||
MetaData localMetadata) {
|
||||
String followIndexName = getFollowerIndexName(autoFollowPattern, leaderIndex.getName());
|
||||
IndexMetaData indexMetaData = localMetadata.index(followIndexName);
|
||||
if (indexMetaData != null) {
|
||||
// If an index with the same name exists, but it is not a follow index for this leader index then
|
||||
// we should let the auto follower attempt to auto follow it, so it can fail later and
|
||||
// it is then visible in the auto follow stats. For example a cluster can just happen to have
|
||||
// an index with the same name as the new follower index.
|
||||
Map<String, String> customData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
|
||||
if (customData != null) {
|
||||
String recordedLeaderIndexUUID = customData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
|
||||
return leaderIndex.getUUID().equals(recordedLeaderIndexUUID);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void followLeaderIndex(String autoFollowPattenName,
|
||||
String remoteCluster,
|
||||
Index indexToFollow,
|
||||
|
@ -492,7 +522,6 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||
|
||||
static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
|
||||
ClusterState remoteClusterState,
|
||||
ClusterState followerClusterState,
|
||||
List<String> followedIndexUUIDs) {
|
||||
List<Index> leaderIndicesToFollow = new ArrayList<>();
|
||||
for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) {
|
||||
|
@ -505,10 +534,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||
// this index will be auto followed.
|
||||
indexRoutingTable.allPrimaryShardsActive() &&
|
||||
followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) {
|
||||
// TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData
|
||||
// has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable
|
||||
// If so then handle it differently: not follow it, but just add an entry to
|
||||
// AutoFollowMetadata#followedLeaderIndexUUIDs
|
||||
|
||||
leaderIndicesToFollow.add(leaderIndexMetaData.getIndex());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower;
|
||||
|
@ -345,8 +346,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
.routingTable(routingTableBuilder.build())
|
||||
.build();
|
||||
|
||||
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState,
|
||||
Collections.emptyList());
|
||||
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
|
||||
result.sort(Comparator.comparing(Index::getName));
|
||||
assertThat(result.size(), equalTo(5));
|
||||
assertThat(result.get(0).getName(), equalTo("metrics-0"));
|
||||
|
@ -356,7 +356,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
assertThat(result.get(4).getName(), equalTo("metrics-4"));
|
||||
|
||||
List<String> followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID());
|
||||
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, followedIndexUUIDs);
|
||||
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, followedIndexUUIDs);
|
||||
result.sort(Comparator.comparing(Index::getName));
|
||||
assertThat(result.size(), equalTo(4));
|
||||
assertThat(result.get(0).getName(), equalTo("metrics-0"));
|
||||
|
@ -390,8 +390,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
.routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build())
|
||||
.build();
|
||||
|
||||
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState,
|
||||
Collections.emptyList());
|
||||
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
|
||||
assertThat(result.size(), equalTo(1));
|
||||
assertThat(result.get(0).getName(), equalTo("index1"));
|
||||
|
||||
|
@ -404,7 +403,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
.routingTable(RoutingTable.builder(remoteState.routingTable()).add(indexRoutingTable).build())
|
||||
.build();
|
||||
|
||||
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, clusterState, Collections.emptyList());
|
||||
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
|
||||
assertThat(result.size(), equalTo(2));
|
||||
result.sort(Comparator.comparing(Index::getName));
|
||||
assertThat(result.get(0).getName(), equalTo("index1"));
|
||||
|
@ -864,6 +863,83 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||
"because soft deletes are not enabled"));
|
||||
}
|
||||
|
||||
public void testAutoFollowerFollowerIndexAlreadyExists() {
|
||||
Client client = mock(Client.class);
|
||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||
|
||||
ClusterState remoteState = createRemoteClusterState("logs-20190101", true);
|
||||
|
||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||
null, null, null, null, null, null, null, null, null, null, null);
|
||||
Map<String, AutoFollowPattern> patterns = new HashMap<>();
|
||||
patterns.put("remote", autoFollowPattern);
|
||||
Map<String, List<String>> followedLeaderIndexUUIDS = new HashMap<>();
|
||||
followedLeaderIndexUUIDS.put("remote", new ArrayList<>());
|
||||
Map<String, Map<String, String>> autoFollowHeaders = new HashMap<>();
|
||||
autoFollowHeaders.put("remote", Collections.singletonMap("key", "val"));
|
||||
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(patterns, followedLeaderIndexUUIDS, autoFollowHeaders);
|
||||
|
||||
ClusterState currentState = ClusterState.builder(new ClusterName("name"))
|
||||
.metaData(MetaData.builder()
|
||||
.put(IndexMetaData.builder("logs-20190101")
|
||||
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
||||
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, Collections.singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY,
|
||||
remoteState.metaData().index("logs-20190101").getIndexUUID()))
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0))
|
||||
.putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
||||
.build();
|
||||
|
||||
|
||||
final Object[] resultHolder = new Object[1];
|
||||
Consumer<List<AutoFollowCoordinator.AutoFollowResult>> handler = results -> {
|
||||
resultHolder[0] = results;
|
||||
};
|
||||
AutoFollower autoFollower = new AutoFollower("remote", handler, localClusterStateSupplier(currentState), () -> 1L) {
|
||||
@Override
|
||||
void getRemoteClusterState(String remoteCluster,
|
||||
long metadataVersion,
|
||||
BiConsumer<ClusterStateResponse, Exception> handler) {
|
||||
assertThat(remoteCluster, equalTo("remote"));
|
||||
handler.accept(new ClusterStateResponse(new ClusterName("name"), remoteState, 1L, false), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
void createAndFollow(Map<String, String> headers,
|
||||
PutFollowAction.Request followRequest,
|
||||
Runnable successHandler,
|
||||
Consumer<Exception> failureHandler) {
|
||||
fail("this should not be invoked");
|
||||
}
|
||||
|
||||
@Override
|
||||
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction,
|
||||
Consumer<Exception> handler) {
|
||||
ClusterState resultCs = updateFunction.apply(currentState);
|
||||
AutoFollowMetadata result = resultCs.metaData().custom(AutoFollowMetadata.TYPE);
|
||||
assertThat(result.getFollowedLeaderIndexUUIDs().size(), equalTo(1));
|
||||
assertThat(result.getFollowedLeaderIndexUUIDs().get("remote").size(), equalTo(1));
|
||||
handler.accept(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
|
||||
// Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
|
||||
}
|
||||
};
|
||||
autoFollower.start();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
List<AutoFollowCoordinator.AutoFollowResult> results = (List<AutoFollowCoordinator.AutoFollowResult>) resultHolder[0];
|
||||
assertThat(results, notNullValue());
|
||||
assertThat(results.size(), equalTo(1));
|
||||
assertThat(results.get(0).clusterStateFetchException, nullValue());
|
||||
List<Map.Entry<Index, Exception>> entries = new ArrayList<>(results.get(0).autoFollowExecutionResults.entrySet());
|
||||
assertThat(entries.size(), equalTo(1));
|
||||
assertThat(entries.get(0).getKey().getName(), equalTo("logs-20190101"));
|
||||
assertThat(entries.get(0).getValue(), nullValue());
|
||||
}
|
||||
|
||||
private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) {
|
||||
Settings.Builder indexSettings;
|
||||
if (enableSoftDeletes != null) {
|
||||
|
|
Loading…
Reference in New Issue