mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-20 03:45:02 +00:00
[CCR] Only auto follow indices when all primary shards have started (#35814)
This change adds an extra check that verifies that all primary shards have been started of an index that is about to be auto followed. If not all primary shards have been started for an index then the next auto follow run will try to follow to auto follow this index again. Closes #35480
This commit is contained in:
parent
fbdfec4305
commit
1390f366d4
@ -19,6 +19,7 @@ import org.elasticsearch.cluster.ClusterStateApplier;
|
|||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
@ -164,6 +165,7 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|||||||
final ClusterStateRequest request = new ClusterStateRequest();
|
final ClusterStateRequest request = new ClusterStateRequest();
|
||||||
request.clear();
|
request.clear();
|
||||||
request.metaData(true);
|
request.metaData(true);
|
||||||
|
request.routingTable(true);
|
||||||
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
|
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
|
||||||
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
|
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
|
||||||
client,
|
client,
|
||||||
@ -367,7 +369,14 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|||||||
List<Index> leaderIndicesToFollow = new ArrayList<>();
|
List<Index> leaderIndicesToFollow = new ArrayList<>();
|
||||||
for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) {
|
for (IndexMetaData leaderIndexMetaData : leaderClusterState.getMetaData()) {
|
||||||
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
|
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
|
||||||
if (followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) {
|
IndexRoutingTable indexRoutingTable = leaderClusterState.routingTable().index(leaderIndexMetaData.getIndex());
|
||||||
|
if (indexRoutingTable != null &&
|
||||||
|
// Leader indices can be in the cluster state, but not all primary shards may be ready yet.
|
||||||
|
// This checks ensures all primary shards have started, so that index following does not fail.
|
||||||
|
// If not all primary shards are ready, then the next time the auto follow coordinator runs
|
||||||
|
// this index will be auto followed.
|
||||||
|
indexRoutingTable.allPrimaryShardsActive() &&
|
||||||
|
followedIndexUUIDs.contains(leaderIndexMetaData.getIndex().getUUID()) == false) {
|
||||||
// TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData
|
// TODO: iterate over the indices in the followerClusterState and check whether a IndexMetaData
|
||||||
// has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable
|
// has a leader index uuid custom metadata entry that matches with uuid of leaderIndexMetaData variable
|
||||||
// If so then handle it differently: not follow it, but just add an entry to
|
// If so then handle it differently: not follow it, but just add an entry to
|
||||||
|
@ -11,6 +11,11 @@ import org.elasticsearch.cluster.ClusterName;
|
|||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
@ -49,12 +54,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
Client client = mock(Client.class);
|
Client client = mock(Client.class);
|
||||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||||
|
|
||||||
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
ClusterState leaderState = createRemoteClusterState("logs-20190101");
|
||||||
.metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101")
|
|
||||||
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
|
||||||
.numberOfShards(1)
|
|
||||||
.numberOfReplicas(0)))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||||
null, null, null, null, null, null, null, null, null, null, null);
|
null, null, null, null, null, null, null, null, null, null, null);
|
||||||
@ -168,13 +168,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
public void testAutoFollowerUpdateClusterStateFailure() {
|
public void testAutoFollowerUpdateClusterStateFailure() {
|
||||||
Client client = mock(Client.class);
|
Client client = mock(Client.class);
|
||||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||||
|
ClusterState leaderState = createRemoteClusterState("logs-20190101");
|
||||||
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
|
||||||
.metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101")
|
|
||||||
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
|
||||||
.numberOfShards(1)
|
|
||||||
.numberOfReplicas(0)))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||||
null, null, null, null, null, null, null, null, null, null, null);
|
null, null, null, null, null, null, null, null, null, null, null);
|
||||||
@ -230,13 +224,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
public void testAutoFollowerCreateAndFollowApiCallFailure() {
|
public void testAutoFollowerCreateAndFollowApiCallFailure() {
|
||||||
Client client = mock(Client.class);
|
Client client = mock(Client.class);
|
||||||
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||||
|
ClusterState leaderState = createRemoteClusterState("logs-20190101");
|
||||||
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
|
||||||
.metaData(MetaData.builder().put(IndexMetaData.builder("logs-20190101")
|
|
||||||
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
|
||||||
.numberOfShards(1)
|
|
||||||
.numberOfReplicas(0)))
|
|
||||||
.build();
|
|
||||||
|
|
||||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"),
|
||||||
null, null, null, null, null, null, null, null, null, null, null);
|
null, null, null, null, null, null, null, null, null, null, null);
|
||||||
@ -299,24 +287,39 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
|
||||||
MetaData.Builder imdBuilder = MetaData.builder();
|
MetaData.Builder imdBuilder = MetaData.builder();
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
|
String indexName = "metrics-" + i;
|
||||||
Settings.Builder builder = Settings.builder()
|
Settings.Builder builder = Settings.builder()
|
||||||
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||||
.put(IndexMetaData.SETTING_INDEX_UUID, "metrics-" + i)
|
.put(IndexMetaData.SETTING_INDEX_UUID, indexName)
|
||||||
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), i % 2 == 0);
|
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), i % 2 == 0);
|
||||||
imdBuilder.put(IndexMetaData.builder("metrics-" + i)
|
imdBuilder.put(IndexMetaData.builder("metrics-" + i)
|
||||||
.settings(builder)
|
.settings(builder)
|
||||||
.numberOfShards(1)
|
.numberOfShards(1)
|
||||||
.numberOfReplicas(0));
|
.numberOfReplicas(0));
|
||||||
|
|
||||||
|
ShardRouting shardRouting =
|
||||||
|
TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
|
||||||
|
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(imdBuilder.get(indexName).getIndex())
|
||||||
|
.addShard(shardRouting)
|
||||||
|
.build();
|
||||||
|
routingTableBuilder.add(indexRoutingTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
imdBuilder.put(IndexMetaData.builder("logs-0")
|
imdBuilder.put(IndexMetaData.builder("logs-0")
|
||||||
.settings(settings(Version.CURRENT))
|
.settings(settings(Version.CURRENT))
|
||||||
.numberOfShards(1)
|
.numberOfShards(1)
|
||||||
.numberOfReplicas(0));
|
.numberOfReplicas(0));
|
||||||
|
ShardRouting shardRouting =
|
||||||
|
TestShardRouting.newShardRouting("logs-0", 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
|
||||||
|
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(imdBuilder.get("logs-0").getIndex()).addShard(shardRouting).build();
|
||||||
|
routingTableBuilder.add(indexRoutingTable);
|
||||||
|
|
||||||
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
ClusterState leaderState = ClusterState.builder(new ClusterName("remote"))
|
||||||
.metaData(imdBuilder)
|
.metaData(imdBuilder)
|
||||||
|
.routingTable(routingTableBuilder.build())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
List<Index> result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState,
|
List<Index> result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState,
|
||||||
@ -335,6 +338,52 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
assertThat(result.get(1).getName(), equalTo("metrics-4"));
|
assertThat(result.get(1).getName(), equalTo("metrics-4"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testGetLeaderIndicesToFollow_shardsNotStarted() {
|
||||||
|
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"),
|
||||||
|
null, null, null, null, null, null, null, null, null, null, null);
|
||||||
|
Map<String, Map<String, String>> headers = new HashMap<>();
|
||||||
|
ClusterState followerState = ClusterState.builder(new ClusterName("remote"))
|
||||||
|
.metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE,
|
||||||
|
new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers)))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// 1 shard started and another not started:
|
||||||
|
ClusterState leaderState = createRemoteClusterState("index1");
|
||||||
|
MetaData.Builder mBuilder= MetaData.builder(leaderState.metaData());
|
||||||
|
mBuilder.put(IndexMetaData.builder("index2")
|
||||||
|
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
||||||
|
.numberOfShards(1)
|
||||||
|
.numberOfReplicas(0));
|
||||||
|
ShardRouting shardRouting =
|
||||||
|
TestShardRouting.newShardRouting("index2", 0, "1", true, ShardRoutingState.INITIALIZING);
|
||||||
|
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(mBuilder.get("index2").getIndex()
|
||||||
|
).addShard(shardRouting).build();
|
||||||
|
leaderState = ClusterState.builder(leaderState.getClusterName())
|
||||||
|
.metaData(mBuilder)
|
||||||
|
.routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
List<Index> result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState,
|
||||||
|
Collections.emptyList());
|
||||||
|
assertThat(result.size(), equalTo(1));
|
||||||
|
assertThat(result.get(0).getName(), equalTo("index1"));
|
||||||
|
|
||||||
|
// Start second shard:
|
||||||
|
shardRouting = shardRouting.moveToStarted();
|
||||||
|
indexRoutingTable = IndexRoutingTable.builder(leaderState.metaData().indices().get("index2").getIndex())
|
||||||
|
.addShard(shardRouting).build();
|
||||||
|
leaderState = ClusterState.builder(leaderState.getClusterName())
|
||||||
|
.metaData(leaderState.metaData())
|
||||||
|
.routingTable(RoutingTable.builder(leaderState.routingTable()).add(indexRoutingTable).build())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
result = AutoFollower.getLeaderIndicesToFollow("remote", autoFollowPattern, leaderState, followerState, Collections.emptyList());
|
||||||
|
assertThat(result.size(), equalTo(2));
|
||||||
|
result.sort(Comparator.comparing(Index::getName));
|
||||||
|
assertThat(result.get(0).getName(), equalTo("index1"));
|
||||||
|
assertThat(result.get(1).getName(), equalTo("index2"));
|
||||||
|
}
|
||||||
|
|
||||||
public void testGetFollowerIndexName() {
|
public void testGetFollowerIndexName() {
|
||||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null,
|
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null,
|
||||||
null, null, null, null, null, null, null, null, null);
|
null, null, null, null, null, null, null, null, null);
|
||||||
@ -408,4 +457,21 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error"));
|
assertThat(autoFollowStats.getRecentAutoFollowErrors().get("_alias2:index2").getCause().getMessage(), equalTo("error"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ClusterState createRemoteClusterState(String indexName) {
|
||||||
|
IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
|
||||||
|
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
||||||
|
.numberOfShards(1)
|
||||||
|
.numberOfReplicas(0)
|
||||||
|
.build();
|
||||||
|
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
|
||||||
|
.metaData(MetaData.builder().put(indexMetaData, true));
|
||||||
|
|
||||||
|
ShardRouting shardRouting =
|
||||||
|
TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
|
||||||
|
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build();
|
||||||
|
csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
|
||||||
|
|
||||||
|
return csBuilder.build();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user