mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
[CCR] AutoFollowCoordinator should tolerate that auto follow patterns may be removed (#35945)
AutoFollowCoordinator should take into account that after auto following an index and while updating that a leader index has been followed, that the auto follow pattern may have been removed via delete auto follow patterns api. Also fixed a bug that when a remote cluster connection has been removed, the auto follow coordinator does not die when it tries get a remote client for that cluster. Closes #35480
This commit is contained in:
parent
5d684ca473
commit
6e1ff31222
@ -39,6 +39,7 @@ import org.elasticsearch.client.ccr.PutFollowResponse;
|
|||||||
import org.elasticsearch.client.ccr.ResumeFollowRequest;
|
import org.elasticsearch.client.ccr.ResumeFollowRequest;
|
||||||
import org.elasticsearch.client.ccr.UnfollowRequest;
|
import org.elasticsearch.client.ccr.UnfollowRequest;
|
||||||
import org.elasticsearch.client.core.AcknowledgedResponse;
|
import org.elasticsearch.client.core.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.common.xcontent.ObjectPath;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||||
@ -55,7 +56,7 @@ import static org.hamcrest.Matchers.notNullValue;
|
|||||||
public class CCRIT extends ESRestHighLevelClientTestCase {
|
public class CCRIT extends ESRestHighLevelClientTestCase {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setupRemoteClusterConfig() throws IOException {
|
public void setupRemoteClusterConfig() throws Exception {
|
||||||
// Configure local cluster as remote cluster:
|
// Configure local cluster as remote cluster:
|
||||||
// TODO: replace with nodes info highlevel rest client code when it is available:
|
// TODO: replace with nodes info highlevel rest client code when it is available:
|
||||||
final Request request = new Request("GET", "/_nodes");
|
final Request request = new Request("GET", "/_nodes");
|
||||||
@ -69,6 +70,14 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
|
|||||||
ClusterUpdateSettingsResponse updateSettingsResponse =
|
ClusterUpdateSettingsResponse updateSettingsResponse =
|
||||||
highLevelClient().cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
|
highLevelClient().cluster().putSettings(updateSettingsRequest, RequestOptions.DEFAULT);
|
||||||
assertThat(updateSettingsResponse.isAcknowledged(), is(true));
|
assertThat(updateSettingsResponse.isAcknowledged(), is(true));
|
||||||
|
|
||||||
|
assertBusy(() -> {
|
||||||
|
Map<?, ?> localConnection = (Map<?, ?>) toMap(client()
|
||||||
|
.performRequest(new Request("GET", "/_remote/info")))
|
||||||
|
.get("local");
|
||||||
|
assertThat(localConnection, notNullValue());
|
||||||
|
assertThat(localConnection.get("connected"), is(true));
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testIndexFollowing() throws Exception {
|
public void testIndexFollowing() throws Exception {
|
||||||
@ -132,7 +141,6 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
|
|||||||
assertThat(unfollowResponse.isAcknowledged(), is(true));
|
assertThat(unfollowResponse.isAcknowledged(), is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/35937")
|
|
||||||
public void testAutoFollowing() throws Exception {
|
public void testAutoFollowing() throws Exception {
|
||||||
CcrClient ccrClient = highLevelClient().ccr();
|
CcrClient ccrClient = highLevelClient().ccr();
|
||||||
PutAutoFollowPatternRequest putAutoFollowPatternRequest =
|
PutAutoFollowPatternRequest putAutoFollowPatternRequest =
|
||||||
@ -149,14 +157,21 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
|
|||||||
|
|
||||||
assertBusy(() -> {
|
assertBusy(() -> {
|
||||||
assertThat(indexExists("copy-logs-20200101"), is(true));
|
assertThat(indexExists("copy-logs-20200101"), is(true));
|
||||||
|
// TODO: replace with HLRC follow stats when available:
|
||||||
|
Map<String, Object> rsp = toMap(client().performRequest(new Request("GET", "/copy-logs-20200101/_ccr/stats")));
|
||||||
|
String index = null;
|
||||||
|
try {
|
||||||
|
index = ObjectPath.eval("indices.0.index", rsp);
|
||||||
|
} catch (Exception e){ }
|
||||||
|
assertThat(index, equalTo("copy-logs-20200101"));
|
||||||
});
|
});
|
||||||
|
|
||||||
GetAutoFollowPatternRequest getAutoFollowPatternRequest =
|
GetAutoFollowPatternRequest getAutoFollowPatternRequest =
|
||||||
randomBoolean() ? new GetAutoFollowPatternRequest("pattern1") : new GetAutoFollowPatternRequest();
|
randomBoolean() ? new GetAutoFollowPatternRequest("pattern1") : new GetAutoFollowPatternRequest();
|
||||||
GetAutoFollowPatternResponse getAutoFollowPatternResponse =
|
GetAutoFollowPatternResponse getAutoFollowPatternResponse =
|
||||||
execute(getAutoFollowPatternRequest, ccrClient::getAutoFollowPattern, ccrClient::getAutoFollowPatternAsync);
|
execute(getAutoFollowPatternRequest, ccrClient::getAutoFollowPattern, ccrClient::getAutoFollowPatternAsync);
|
||||||
assertThat(getAutoFollowPatternResponse.getPatterns().size(), equalTo(1L));
|
assertThat(getAutoFollowPatternResponse.getPatterns().size(), equalTo(1));
|
||||||
GetAutoFollowPatternResponse.Pattern pattern = getAutoFollowPatternResponse.getPatterns().get("patterns1");
|
GetAutoFollowPatternResponse.Pattern pattern = getAutoFollowPatternResponse.getPatterns().get("pattern1");
|
||||||
assertThat(pattern, notNullValue());
|
assertThat(pattern, notNullValue());
|
||||||
assertThat(pattern.getRemoteCluster(), equalTo(putAutoFollowPatternRequest.getRemoteCluster()));
|
assertThat(pattern.getRemoteCluster(), equalTo(putAutoFollowPatternRequest.getRemoteCluster()));
|
||||||
assertThat(pattern.getLeaderIndexPatterns(), equalTo(putAutoFollowPatternRequest.getLeaderIndexPatterns()));
|
assertThat(pattern.getLeaderIndexPatterns(), equalTo(putAutoFollowPatternRequest.getLeaderIndexPatterns()));
|
||||||
|
@ -160,15 +160,22 @@ public final class CcrLicenseChecker {
|
|||||||
final ClusterStateRequest request,
|
final ClusterStateRequest request,
|
||||||
final Consumer<Exception> onFailure,
|
final Consumer<Exception> onFailure,
|
||||||
final Consumer<ClusterState> leaderClusterStateConsumer) {
|
final Consumer<ClusterState> leaderClusterStateConsumer) {
|
||||||
checkRemoteClusterLicenseAndFetchClusterState(
|
try {
|
||||||
|
Client remoteClient = systemClient(client.getRemoteClusterClient(clusterAlias));
|
||||||
|
checkRemoteClusterLicenseAndFetchClusterState(
|
||||||
client,
|
client,
|
||||||
clusterAlias,
|
clusterAlias,
|
||||||
systemClient(client.getRemoteClusterClient(clusterAlias)),
|
remoteClient,
|
||||||
request,
|
request,
|
||||||
onFailure,
|
onFailure,
|
||||||
leaderClusterStateConsumer,
|
leaderClusterStateConsumer,
|
||||||
CcrLicenseChecker::clusterStateNonCompliantRemoteLicense,
|
CcrLicenseChecker::clusterStateNonCompliantRemoteLicense,
|
||||||
e -> clusterStateUnknownRemoteLicense(clusterAlias, e));
|
e -> clusterStateUnknownRemoteLicense(clusterAlias, e));
|
||||||
|
} catch (Exception e) {
|
||||||
|
// client.getRemoteClusterClient(...) can fail with a IllegalArgumentException if remote
|
||||||
|
// connection is unknown
|
||||||
|
onFailure.accept(e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -403,6 +403,13 @@ public class AutoFollowCoordinator implements ClusterStateApplier {
|
|||||||
return currentState -> {
|
return currentState -> {
|
||||||
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
|
AutoFollowMetadata currentAutoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
|
||||||
Map<String, List<String>> newFollowedIndexUUIDS = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
|
Map<String, List<String>> newFollowedIndexUUIDS = new HashMap<>(currentAutoFollowMetadata.getFollowedLeaderIndexUUIDs());
|
||||||
|
if (newFollowedIndexUUIDS.containsKey(name) == false) {
|
||||||
|
// A delete auto follow pattern request can have removed the auto follow pattern while we want to update
|
||||||
|
// the auto follow metadata with the fact that an index was successfully auto followed. If this
|
||||||
|
// happens, we can just skip this step.
|
||||||
|
return currentState;
|
||||||
|
}
|
||||||
|
|
||||||
newFollowedIndexUUIDS.compute(name, (key, existingUUIDs) -> {
|
newFollowedIndexUUIDS.compute(name, (key, existingUUIDs) -> {
|
||||||
assert existingUUIDs != null;
|
assert existingUUIDs != null;
|
||||||
List<String> newUUIDs = new ArrayList<>(existingUUIDs);
|
List<String> newUUIDs = new ArrayList<>(existingUUIDs);
|
||||||
|
@ -40,8 +40,10 @@ import java.util.function.BiConsumer;
|
|||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
|
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.hamcrest.Matchers.sameInstance;
|
import static org.hamcrest.Matchers.sameInstance;
|
||||||
import static org.mockito.Matchers.anyString;
|
import static org.mockito.Matchers.anyString;
|
||||||
@ -384,6 +386,33 @@ public class AutoFollowCoordinatorTests extends ESTestCase {
|
|||||||
assertThat(result.get(1).getName(), equalTo("index2"));
|
assertThat(result.get(1).getName(), equalTo("index2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testRecordLeaderIndexAsFollowFunction() {
|
||||||
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(),
|
||||||
|
Collections.singletonMap("pattern1", Collections.emptyList()), Collections.emptyMap());
|
||||||
|
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
|
||||||
|
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
||||||
|
.build();
|
||||||
|
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction("pattern1", new Index("index1", "index1"));
|
||||||
|
|
||||||
|
ClusterState result = function.apply(clusterState);
|
||||||
|
AutoFollowMetadata autoFollowMetadataResult = result.metaData().custom(AutoFollowMetadata.TYPE);
|
||||||
|
assertThat(autoFollowMetadataResult.getFollowedLeaderIndexUUIDs().get("pattern1"), notNullValue());
|
||||||
|
assertThat(autoFollowMetadataResult.getFollowedLeaderIndexUUIDs().get("pattern1").size(), equalTo(1));
|
||||||
|
assertThat(autoFollowMetadataResult.getFollowedLeaderIndexUUIDs().get("pattern1").get(0), equalTo("index1"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRecordLeaderIndexAsFollowFunctionNoEntry() {
|
||||||
|
AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(),
|
||||||
|
Collections.emptyMap());
|
||||||
|
ClusterState clusterState = new ClusterState.Builder(new ClusterName("name"))
|
||||||
|
.metaData(new MetaData.Builder().putCustom(AutoFollowMetadata.TYPE, autoFollowMetadata))
|
||||||
|
.build();
|
||||||
|
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction("pattern1", new Index("index1", "index1"));
|
||||||
|
|
||||||
|
ClusterState result = function.apply(clusterState);
|
||||||
|
assertThat(result, sameInstance(clusterState));
|
||||||
|
}
|
||||||
|
|
||||||
public void testGetFollowerIndexName() {
|
public void testGetFollowerIndexName() {
|
||||||
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null,
|
AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null,
|
||||||
null, null, null, null, null, null, null, null, null);
|
null, null, null, null, null, null, null, null, null);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user