Backport of (#47721) for 7.x. Similarly to #47582, Auto-follow patterns creates following indices as long as the remote index matches the pattern and the remote primary shards are all started. But since 7.2 closed indices are also replicated, and it does not play well with CCR auto-follow patterns as they create following indices for closed leader indices too. This commit changes the getLeaderIndicesToFollow() so that closed indices are excluded from auto-follow patterns.
This commit is contained in:
parent
813f44cec8
commit
8f86469d3f
|
@ -596,6 +596,9 @@ public class AutoFollowCoordinator extends AbstractLifecycleComponent implements
|
||||||
List<String> followedIndexUUIDs) {
|
List<String> followedIndexUUIDs) {
|
||||||
List<Index> leaderIndicesToFollow = new ArrayList<>();
|
List<Index> leaderIndicesToFollow = new ArrayList<>();
|
||||||
for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) {
|
for (IndexMetaData leaderIndexMetaData : remoteClusterState.getMetaData()) {
|
||||||
|
if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
|
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
|
||||||
IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex());
|
IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex());
|
||||||
if (indexRoutingTable != null &&
|
if (indexRoutingTable != null &&
|
||||||
|
|
|
@ -5,8 +5,10 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ccr.action;
|
package org.elasticsearch.xpack.ccr.action;
|
||||||
|
|
||||||
|
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
|
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
@ -18,11 +20,13 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
|
@ -44,10 +48,12 @@ import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
@ -57,6 +63,7 @@ import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollo
|
||||||
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
|
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThan;
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
import static org.hamcrest.Matchers.hasItem;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
@ -416,6 +423,26 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
assertThat(result.get(1).getName(), equalTo("index2"));
|
assertThat(result.get(1).getName(), equalTo("index2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testGetLeaderIndicesToFollowWithClosedIndices() {
|
||||||
|
final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"),
|
||||||
|
null, null, null, null, null, null, null, null, null, null, null);
|
||||||
|
|
||||||
|
// index is opened
|
||||||
|
ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0);
|
||||||
|
List<Index> result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
|
||||||
|
assertThat(result.size(), equalTo(1));
|
||||||
|
assertThat(result, hasItem(remoteState.metaData().index("test-index").getIndex()));
|
||||||
|
|
||||||
|
// index is closed
|
||||||
|
remoteState = ClusterState.builder(remoteState)
|
||||||
|
.metaData(MetaData.builder(remoteState.metaData())
|
||||||
|
.put(IndexMetaData.builder(remoteState.metaData().index("test-index")).state(IndexMetaData.State.CLOSE).build(), true)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, Collections.emptyList());
|
||||||
|
assertThat(result.size(), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
public void testRecordLeaderIndexAsFollowFunction() {
|
public void testRecordLeaderIndexAsFollowFunction() {
|
||||||
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
|
||||||
Collections.singletonMap("pattern1", Collections.emptyList()), Collections.emptyMap());
|
Collections.singletonMap("pattern1", Collections.emptyList()), Collections.emptyMap());
|
||||||
|
@ -763,7 +790,9 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
autoFollower.start();
|
autoFollower.start();
|
||||||
assertThat(allResults.size(), equalTo(states.length));
|
assertThat(allResults.size(), equalTo(states.length));
|
||||||
for (int i = 0; i < states.length; i++) {
|
for (int i = 0; i < states.length; i++) {
|
||||||
assertThat(allResults.get(i).autoFollowExecutionResults.containsKey(new Index("logs-" + i, "_na_")), is(true));
|
final String indexName = "logs-" + i;
|
||||||
|
assertThat(allResults.get(i).autoFollowExecutionResults.keySet().stream()
|
||||||
|
.anyMatch(index -> index.getName().equals(indexName)), is(true));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1049,6 +1078,87 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testClosedIndicesAreNotAutoFollowed() {
|
||||||
|
final Client client = mock(Client.class);
|
||||||
|
when(client.getRemoteClusterClient(anyString())).thenReturn(client);
|
||||||
|
|
||||||
|
final String pattern = "pattern1";
|
||||||
|
final ClusterState localState = ClusterState.builder(new ClusterName("local"))
|
||||||
|
.metaData(MetaData.builder()
|
||||||
|
.putCustom(AutoFollowMetadata.TYPE,
|
||||||
|
new AutoFollowMetadata(Collections.singletonMap(pattern,
|
||||||
|
new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, null, null, null, null, null, null, null,
|
||||||
|
null, null, null)),
|
||||||
|
Collections.singletonMap(pattern, Collections.emptyList()),
|
||||||
|
Collections.singletonMap(pattern, Collections.emptyMap()))))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
ClusterState remoteState = null;
|
||||||
|
final int nbLeaderIndices = randomIntBetween(1, 15);
|
||||||
|
for (int i = 0; i < nbLeaderIndices; i++) {
|
||||||
|
String indexName = "docs-" + i;
|
||||||
|
if (remoteState == null) {
|
||||||
|
remoteState = createRemoteClusterState(indexName, true);
|
||||||
|
} else {
|
||||||
|
remoteState = createRemoteClusterState(remoteState, indexName);
|
||||||
|
}
|
||||||
|
if (randomBoolean()) {
|
||||||
|
// randomly close the index
|
||||||
|
remoteState = ClusterState.builder(remoteState.getClusterName())
|
||||||
|
.routingTable(remoteState.routingTable())
|
||||||
|
.metaData(MetaData.builder(remoteState.metaData())
|
||||||
|
.put(IndexMetaData.builder(remoteState.metaData().index(indexName)).state(IndexMetaData.State.CLOSE).build(), true)
|
||||||
|
.build())
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final ClusterState finalRemoteState = remoteState;
|
||||||
|
final AtomicReference<ClusterState> lastModifiedClusterState = new AtomicReference<>(localState);
|
||||||
|
final List<AutoFollowCoordinator.AutoFollowResult> results = new ArrayList<>();
|
||||||
|
final Set<Object> followedIndices = ConcurrentCollections.newConcurrentSet();
|
||||||
|
final AutoFollower autoFollower =
|
||||||
|
new AutoFollower("remote", results::addAll, localClusterStateSupplier(localState), () -> 1L, Runnable::run) {
|
||||||
|
@Override
|
||||||
|
void getRemoteClusterState(String remoteCluster,
|
||||||
|
long metadataVersion,
|
||||||
|
BiConsumer<ClusterStateResponse, Exception> handler) {
|
||||||
|
assertThat(remoteCluster, equalTo("remote"));
|
||||||
|
handler.accept(new ClusterStateResponse(new ClusterName("remote"), finalRemoteState, false), null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void createAndFollow(Map<String, String> headers,
|
||||||
|
PutFollowAction.Request followRequest,
|
||||||
|
Runnable successHandler,
|
||||||
|
Consumer<Exception> failureHandler) {
|
||||||
|
followedIndices.add(followRequest.getLeaderIndex());
|
||||||
|
successHandler.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
|
||||||
|
lastModifiedClusterState.updateAndGet(updateFunction::apply);
|
||||||
|
handler.accept(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
|
||||||
|
// Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
|
||||||
|
}
|
||||||
|
};
|
||||||
|
autoFollower.start();
|
||||||
|
|
||||||
|
assertThat(results, notNullValue());
|
||||||
|
assertThat(results.size(), equalTo(1));
|
||||||
|
|
||||||
|
for (ObjectObjectCursor<String, IndexMetaData> index : remoteState.metaData().indices()) {
|
||||||
|
boolean expect = index.value.getState() == IndexMetaData.State.OPEN;
|
||||||
|
assertThat(results.get(0).autoFollowExecutionResults.containsKey(index.value.getIndex()), is(expect));
|
||||||
|
assertThat(followedIndices.contains(index.key), is(expect));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) {
|
private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) {
|
||||||
Settings.Builder indexSettings;
|
Settings.Builder indexSettings;
|
||||||
if (enableSoftDeletes != null) {
|
if (enableSoftDeletes != null) {
|
||||||
|
@ -1075,11 +1185,13 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
|
|
||||||
private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) {
|
private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) {
|
||||||
IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
|
IndexMetaData indexMetaData = IndexMetaData.builder(indexName)
|
||||||
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true))
|
.settings(settings(Version.CURRENT)
|
||||||
|
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
|
||||||
|
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))
|
||||||
.numberOfShards(1)
|
.numberOfShards(1)
|
||||||
.numberOfReplicas(0)
|
.numberOfReplicas(0)
|
||||||
.build();
|
.build();
|
||||||
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
|
ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName())
|
||||||
.metaData(MetaData.builder(previous.metaData())
|
.metaData(MetaData.builder(previous.metaData())
|
||||||
.version(previous.metaData().version() + 1)
|
.version(previous.metaData().version() + 1)
|
||||||
.put(indexMetaData, true));
|
.put(indexMetaData, true));
|
||||||
|
@ -1087,7 +1199,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
||||||
ShardRouting shardRouting =
|
ShardRouting shardRouting =
|
||||||
TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
|
TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted();
|
||||||
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build();
|
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build();
|
||||||
csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
|
csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build();
|
||||||
|
|
||||||
return csBuilder.build();
|
return csBuilder.build();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue