mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-09 14:34:43 +00:00
Introduce forget follower API (#39718)
This commit introduces the forget follower API. This API is needed in cases that unfollowing a following index fails to remove the shard history retention leases on the leader index. This can happen explicitly through user action, or implicitly through an index managed by ILM. When this occurs, history will be retained longer than necessary. While the retention lease will eventually expire, it can be expensive to allow history to persist for that long, and also prevent ILM from performing actions like shrink on the leader index. As such, we introduce an API to allow for manual removal of the shard history retention leases in this case.
This commit is contained in:
parent
6c75a2f2b0
commit
0250d554b6
@ -104,7 +104,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
|
||||
* format of the response is incompatible i.e. it is not a JSON object.
|
||||
*/
|
||||
static shouldAddShardFailureCheck(String path) {
|
||||
return path.startsWith('_cat') == false && path.startsWith('_ml/datafeeds/') == false
|
||||
return path.startsWith('_cat') == false && path.startsWith('_ml/datafeeds/') == false
|
||||
}
|
||||
|
||||
/**
|
||||
@ -294,7 +294,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
|
||||
}
|
||||
|
||||
void emitDo(String method, String pathAndQuery, String body,
|
||||
String catchPart, List warnings, boolean inSetup) {
|
||||
String catchPart, List warnings, boolean inSetup, boolean skipShardFailures) {
|
||||
def (String path, String query) = pathAndQuery.tokenize('?')
|
||||
if (path == null) {
|
||||
path = '' // Catch requests to the root...
|
||||
@ -346,7 +346,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
|
||||
* section so we have to skip it there. We also omit the assertion
|
||||
* from APIs that don't return a JSON object
|
||||
*/
|
||||
if (false == inSetup && shouldAddShardFailureCheck(path)) {
|
||||
if (false == inSetup && skipShardFailures == false && shouldAddShardFailureCheck(path)) {
|
||||
current.println(" - is_false: _shards.failures")
|
||||
}
|
||||
}
|
||||
@ -394,7 +394,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask {
|
||||
pathAndQuery = pathAndQuery.substring(1)
|
||||
}
|
||||
emitDo(method, pathAndQuery, body, catchPart, snippet.warnings,
|
||||
inSetup)
|
||||
inSetup, snippet.skipShardsFailures)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,7 +45,7 @@ public class SnippetsTask extends DefaultTask {
|
||||
private static final String WARNING = /warning:(.+)/
|
||||
private static final String CAT = /(_cat)/
|
||||
private static final String TEST_SYNTAX =
|
||||
/(?:$CATCH|$SUBSTITUTION|$SKIP|(continued)|$SETUP|$WARNING) ?/
|
||||
/(?:$CATCH|$SUBSTITUTION|$SKIP|(continued)|$SETUP|$WARNING|(skip_shard_failures)) ?/
|
||||
|
||||
/**
|
||||
* Action to take on each snippet. Called with a single parameter, an
|
||||
@ -233,6 +233,10 @@ public class SnippetsTask extends DefaultTask {
|
||||
snippet.warnings.add(it.group(7))
|
||||
return
|
||||
}
|
||||
if (it.group(8) != null) {
|
||||
snippet.skipShardsFailures = true
|
||||
return
|
||||
}
|
||||
throw new InvalidUserDataException(
|
||||
"Invalid test marker: $line")
|
||||
}
|
||||
@ -329,6 +333,7 @@ public class SnippetsTask extends DefaultTask {
|
||||
String setup = null
|
||||
boolean curl
|
||||
List warnings = new ArrayList()
|
||||
boolean skipShardsFailures = false
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
@ -359,6 +364,9 @@ public class SnippetsTask extends DefaultTask {
|
||||
for (String warning in warnings) {
|
||||
result += "[warning:$warning]"
|
||||
}
|
||||
if (skipShardsFailures) {
|
||||
result += '[skip_shard_failures]'
|
||||
}
|
||||
}
|
||||
if (testResponse) {
|
||||
result += '// TESTRESPONSE'
|
||||
|
@ -27,6 +27,7 @@ import org.elasticsearch.client.ccr.FollowInfoRequest;
|
||||
import org.elasticsearch.client.ccr.FollowInfoResponse;
|
||||
import org.elasticsearch.client.ccr.FollowStatsRequest;
|
||||
import org.elasticsearch.client.ccr.FollowStatsResponse;
|
||||
import org.elasticsearch.client.ccr.ForgetFollowerRequest;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse;
|
||||
import org.elasticsearch.client.ccr.PauseFollowRequest;
|
||||
@ -36,6 +37,7 @@ import org.elasticsearch.client.ccr.PutFollowResponse;
|
||||
import org.elasticsearch.client.ccr.ResumeFollowRequest;
|
||||
import org.elasticsearch.client.ccr.UnfollowRequest;
|
||||
import org.elasticsearch.client.core.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.core.BroadcastResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
@ -233,6 +235,48 @@ public final class CcrClient {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Instructs an index acting as a leader index to forget the specified follower index.
|
||||
*
|
||||
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-forget-follower.html">the docs</a> for more details
|
||||
* on the intended usage of this API.
|
||||
*
|
||||
* @param request the request
|
||||
* @param options the request options (e.g., headers), use {@link RequestOptions#DEFAULT} if the defaults are acceptable.
|
||||
* @return the response
|
||||
* @throws IOException if an I/O exception occurs while executing this request
|
||||
*/
|
||||
public BroadcastResponse forgetFollower(final ForgetFollowerRequest request, final RequestOptions options) throws IOException {
|
||||
return restHighLevelClient.performRequestAndParseEntity(
|
||||
request,
|
||||
CcrRequestConverters::forgetFollower,
|
||||
options,
|
||||
BroadcastResponse::fromXContent,
|
||||
Collections.emptySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Asynchronously instructs an index acting as a leader index to forget the specified follower index.
|
||||
*
|
||||
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-forget-follower.html">the docs</a> for more details
|
||||
* on the intended usage of this API.
|
||||
*
|
||||
* @param request the request
|
||||
* @param options the request options (e.g., headers), use {@link RequestOptions#DEFAULT} if the defaults are acceptable.
|
||||
*/
|
||||
public void forgetFollowerAsync(
|
||||
final ForgetFollowerRequest request,
|
||||
final RequestOptions options,
|
||||
final ActionListener<BroadcastResponse> listener) {
|
||||
restHighLevelClient.performRequestAsyncAndParseEntity(
|
||||
request,
|
||||
CcrRequestConverters::forgetFollower,
|
||||
options,
|
||||
BroadcastResponse::fromXContent,
|
||||
listener,
|
||||
Collections.emptySet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Stores an auto follow pattern.
|
||||
*
|
||||
|
@ -27,6 +27,7 @@ import org.elasticsearch.client.ccr.CcrStatsRequest;
|
||||
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
|
||||
import org.elasticsearch.client.ccr.FollowInfoRequest;
|
||||
import org.elasticsearch.client.ccr.FollowStatsRequest;
|
||||
import org.elasticsearch.client.ccr.ForgetFollowerRequest;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
|
||||
import org.elasticsearch.client.ccr.PauseFollowRequest;
|
||||
import org.elasticsearch.client.ccr.PutAutoFollowPatternRequest;
|
||||
@ -79,6 +80,17 @@ final class CcrRequestConverters {
|
||||
return new Request(HttpPost.METHOD_NAME, endpoint);
|
||||
}
|
||||
|
||||
static Request forgetFollower(final ForgetFollowerRequest forgetFollowerRequest) throws IOException {
|
||||
final String endpoint = new RequestConverters.EndpointBuilder()
|
||||
.addPathPart(forgetFollowerRequest.leaderIndex())
|
||||
.addPathPartAsIs("_ccr")
|
||||
.addPathPartAsIs("forget_follower")
|
||||
.build();
|
||||
final Request request = new Request(HttpPost.METHOD_NAME, endpoint);
|
||||
request.setEntity(createEntity(forgetFollowerRequest, REQUEST_BODY_CONTENT_TYPE));
|
||||
return request;
|
||||
}
|
||||
|
||||
static Request putAutoFollowPattern(PutAutoFollowPatternRequest putAutoFollowPatternRequest) throws IOException {
|
||||
String endpoint = new RequestConverters.EndpointBuilder()
|
||||
.addPathPartAsIs("_ccr", "auto_follow")
|
||||
|
@ -0,0 +1,89 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.ccr;
|
||||
|
||||
import org.elasticsearch.client.Validatable;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Represents a forget follower request. Note that this an expert API intended to be used only when unfollowing a follower index fails to
|
||||
* remove the follower retention leases. Please be sure that you understand the purpose this API before using.
|
||||
*/
|
||||
public final class ForgetFollowerRequest implements ToXContentObject, Validatable {
|
||||
|
||||
private final String followerCluster;
|
||||
|
||||
private final String followerIndex;
|
||||
|
||||
private final String followerIndexUUID;
|
||||
|
||||
private final String leaderRemoteCluster;
|
||||
|
||||
private final String leaderIndex;
|
||||
|
||||
/**
|
||||
* The name of the leader index.
|
||||
*
|
||||
* @return the name of the leader index
|
||||
*/
|
||||
public String leaderIndex() {
|
||||
return leaderIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a forget follower request.
|
||||
*
|
||||
* @param followerCluster the name of the cluster containing the follower index to forget
|
||||
* @param followerIndex the name of follower index
|
||||
* @param followerIndexUUID the UUID of the follower index
|
||||
* @param leaderRemoteCluster the alias of the remote cluster containing the leader index from the perspective of the follower index
|
||||
* @param leaderIndex the name of the leader index
|
||||
*/
|
||||
public ForgetFollowerRequest(
|
||||
final String followerCluster,
|
||||
final String followerIndex,
|
||||
final String followerIndexUUID,
|
||||
final String leaderRemoteCluster,
|
||||
final String leaderIndex) {
|
||||
this.followerCluster = Objects.requireNonNull(followerCluster);
|
||||
this.followerIndex = Objects.requireNonNull(followerIndex);
|
||||
this.followerIndexUUID = Objects.requireNonNull(followerIndexUUID);
|
||||
this.leaderRemoteCluster = Objects.requireNonNull(leaderRemoteCluster);
|
||||
this.leaderIndex = Objects.requireNonNull(leaderIndex);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field("follower_cluster", followerCluster);
|
||||
builder.field("follower_index", followerIndex);
|
||||
builder.field("follower_index_uuid", followerIndexUUID);
|
||||
builder.field("leader_remote_cluster", leaderRemoteCluster);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,175 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.core;
|
||||
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Represents a response to a request that is broadcast to a collection of shards.
|
||||
*/
|
||||
public class BroadcastResponse {
|
||||
|
||||
private final Shards shards;
|
||||
|
||||
/**
|
||||
* Represents the shard-level summary of the response execution.
|
||||
*
|
||||
* @return the shard-level response summary
|
||||
*/
|
||||
public Shards shards() {
|
||||
return shards;
|
||||
}
|
||||
|
||||
BroadcastResponse(final Shards shards) {
|
||||
this.shards = Objects.requireNonNull(shards);
|
||||
}
|
||||
|
||||
private static final ParseField SHARDS_FIELD = new ParseField("_shards");
|
||||
|
||||
static final ConstructingObjectParser<BroadcastResponse, Void> PARSER = new ConstructingObjectParser<>(
|
||||
"broadcast_response",
|
||||
a -> new BroadcastResponse((Shards) a[0]));
|
||||
|
||||
static {
|
||||
PARSER.declareObject(ConstructingObjectParser.constructorArg(), Shards.SHARDS_PARSER, SHARDS_FIELD);
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses a broadcast response.
|
||||
*
|
||||
* @param parser the parser
|
||||
* @return a broadcast response parsed from the specified parser
|
||||
* @throws IOException if an I/O exception occurs parsing the response
|
||||
*/
|
||||
public static BroadcastResponse fromXContent(final XContentParser parser) throws IOException {
|
||||
return PARSER.parse(parser, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the results of a collection of shards on which a request was executed against.
|
||||
*/
|
||||
public static class Shards {
|
||||
|
||||
private final int total;
|
||||
|
||||
/**
|
||||
* The total number of shards on which a request was executed against.
|
||||
*
|
||||
* @return the total number of shards
|
||||
*/
|
||||
public int total() {
|
||||
return total;
|
||||
}
|
||||
|
||||
private final int successful;
|
||||
|
||||
/**
|
||||
* The number of successful shards on which a request was executed against.
|
||||
*
|
||||
* @return the number of successful shards
|
||||
*/
|
||||
public int successful() {
|
||||
return successful;
|
||||
}
|
||||
|
||||
private final int skipped;
|
||||
|
||||
/**
|
||||
* The number of shards skipped by the request.
|
||||
*
|
||||
* @return the number of skipped shards
|
||||
*/
|
||||
public int skipped() {
|
||||
return skipped;
|
||||
}
|
||||
|
||||
private final int failed;
|
||||
|
||||
/**
|
||||
* The number of shards on which a request failed to be executed against.
|
||||
*
|
||||
* @return the number of failed shards
|
||||
*/
|
||||
public int failed() {
|
||||
return failed;
|
||||
}
|
||||
|
||||
private final Collection<DefaultShardOperationFailedException> failures;
|
||||
|
||||
/**
|
||||
* The failures corresponding to the shards on which a request failed to be executed against. Note that the number of failures might
|
||||
* not match {@link #failed()} as some responses group together shard failures.
|
||||
*
|
||||
* @return the failures
|
||||
*/
|
||||
public Collection<DefaultShardOperationFailedException> failures() {
|
||||
return failures;
|
||||
}
|
||||
|
||||
Shards(
|
||||
final int total,
|
||||
final int successful,
|
||||
final int skipped,
|
||||
final int failed,
|
||||
final Collection<DefaultShardOperationFailedException> failures) {
|
||||
this.total = total;
|
||||
this.successful = successful;
|
||||
this.skipped = skipped;
|
||||
this.failed = failed;
|
||||
this.failures = Collections.unmodifiableCollection(Objects.requireNonNull(failures));
|
||||
}
|
||||
|
||||
private static final ParseField TOTAL_FIELD = new ParseField("total");
|
||||
private static final ParseField SUCCESSFUL_FIELD = new ParseField("successful");
|
||||
private static final ParseField SKIPPED_FIELD = new ParseField("skipped");
|
||||
private static final ParseField FAILED_FIELD = new ParseField("failed");
|
||||
private static final ParseField FAILURES_FIELD = new ParseField("failures");
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static final ConstructingObjectParser<Shards, Void> SHARDS_PARSER = new ConstructingObjectParser<>(
|
||||
"shards",
|
||||
a -> new Shards(
|
||||
(int) a[0], // total
|
||||
(int) a[1], // successful
|
||||
a[2] == null ? 0 : (int) a[2], // skipped
|
||||
(int) a[3], // failed
|
||||
a[4] == null ? Collections.emptyList() : (Collection<DefaultShardOperationFailedException>) a[4])); // failures
|
||||
|
||||
static {
|
||||
SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), TOTAL_FIELD);
|
||||
SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SUCCESSFUL_FIELD);
|
||||
SHARDS_PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), SKIPPED_FIELD);
|
||||
SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), FAILED_FIELD);
|
||||
SHARDS_PARSER.declareObjectArray(
|
||||
ConstructingObjectParser.optionalConstructorArg(),
|
||||
DefaultShardOperationFailedException.PARSER, FAILURES_FIELD);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -36,6 +36,7 @@ import org.elasticsearch.client.ccr.FollowInfoRequest;
|
||||
import org.elasticsearch.client.ccr.FollowInfoResponse;
|
||||
import org.elasticsearch.client.ccr.FollowStatsRequest;
|
||||
import org.elasticsearch.client.ccr.FollowStatsResponse;
|
||||
import org.elasticsearch.client.ccr.ForgetFollowerRequest;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse;
|
||||
import org.elasticsearch.client.ccr.IndicesFollowStats;
|
||||
@ -47,19 +48,24 @@ import org.elasticsearch.client.ccr.PutFollowResponse;
|
||||
import org.elasticsearch.client.ccr.ResumeFollowRequest;
|
||||
import org.elasticsearch.client.ccr.UnfollowRequest;
|
||||
import org.elasticsearch.client.core.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.core.BroadcastResponse;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.indices.CreateIndexResponse;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
@ -203,6 +209,61 @@ public class CCRIT extends ESRestHighLevelClientTestCase {
|
||||
assertThat(unfollowResponse.isAcknowledged(), is(true));
|
||||
}
|
||||
|
||||
public void testForgetFollower() throws IOException {
|
||||
final CcrClient ccrClient = highLevelClient().ccr();
|
||||
|
||||
final CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader");
|
||||
final Map<String, String> settings = new HashMap<>(3);
|
||||
final int numberOfShards = randomIntBetween(1, 2);
|
||||
settings.put("index.number_of_replicas", "0");
|
||||
settings.put("index.number_of_shards", Integer.toString(numberOfShards));
|
||||
settings.put("index.soft_deletes.enabled", Boolean.TRUE.toString());
|
||||
createIndexRequest.settings(settings);
|
||||
final CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT);
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
|
||||
final PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE);
|
||||
final PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync);
|
||||
assertTrue(putFollowResponse.isFollowIndexCreated());
|
||||
assertTrue(putFollowResponse.isFollowIndexShardsAcked());
|
||||
assertTrue(putFollowResponse.isIndexFollowingStarted());
|
||||
|
||||
final String clusterName = highLevelClient().info(RequestOptions.DEFAULT).getClusterName().value();
|
||||
|
||||
final Request statsRequest = new Request("GET", "/follower/_stats");
|
||||
final Response statsResponse = client().performRequest(statsRequest);
|
||||
final ObjectPath statsObjectPath = ObjectPath.createFromResponse(statsResponse);
|
||||
final String followerIndexUUID = statsObjectPath.evaluate("indices.follower.uuid");
|
||||
|
||||
final PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower");
|
||||
AcknowledgedResponse pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync);
|
||||
assertTrue(pauseFollowResponse.isAcknowledged());
|
||||
|
||||
final ForgetFollowerRequest forgetFollowerRequest =
|
||||
new ForgetFollowerRequest(clusterName, "follower", followerIndexUUID, "local_cluster", "leader");
|
||||
final BroadcastResponse forgetFollowerResponse =
|
||||
execute(forgetFollowerRequest, ccrClient::forgetFollower, ccrClient::forgetFollowerAsync);
|
||||
assertThat(forgetFollowerResponse.shards().total(), equalTo(numberOfShards));
|
||||
assertThat(forgetFollowerResponse.shards().successful(), equalTo(numberOfShards));
|
||||
assertThat(forgetFollowerResponse.shards().skipped(), equalTo(0));
|
||||
assertThat(forgetFollowerResponse.shards().failed(), equalTo(0));
|
||||
assertThat(forgetFollowerResponse.shards().failures(), empty());
|
||||
|
||||
final Request retentionLeasesRequest = new Request("GET", "/leader/_stats");
|
||||
retentionLeasesRequest.addParameter("level", "shards");
|
||||
final Response retentionLeasesResponse = client().performRequest(retentionLeasesRequest);
|
||||
final Map<?, ?> shardsStats = ObjectPath.createFromResponse(retentionLeasesResponse).evaluate("indices.leader.shards");
|
||||
assertThat(shardsStats.keySet(), hasSize(numberOfShards));
|
||||
for (int i = 0; i < numberOfShards; i++) {
|
||||
final List<?> shardStats = (List<?>) shardsStats.get(Integer.toString(i));
|
||||
assertThat(shardStats, hasSize(1));
|
||||
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardStats.get(0);
|
||||
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
|
||||
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
|
||||
assertThat(leases, empty());
|
||||
}
|
||||
}
|
||||
|
||||
public void testAutoFollowing() throws Exception {
|
||||
CcrClient ccrClient = highLevelClient().ccr();
|
||||
PutAutoFollowPatternRequest putAutoFollowPatternRequest =
|
||||
|
@ -29,6 +29,7 @@ import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
|
||||
import org.elasticsearch.client.ccr.FollowConfig;
|
||||
import org.elasticsearch.client.ccr.FollowInfoRequest;
|
||||
import org.elasticsearch.client.ccr.FollowStatsRequest;
|
||||
import org.elasticsearch.client.ccr.ForgetFollowerRequest;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
|
||||
import org.elasticsearch.client.ccr.PauseFollowRequest;
|
||||
import org.elasticsearch.client.ccr.PutAutoFollowPatternRequest;
|
||||
@ -39,9 +40,11 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@ -91,6 +94,20 @@ public class CcrRequestConvertersTests extends ESTestCase {
|
||||
assertThat(result.getEntity(), nullValue());
|
||||
}
|
||||
|
||||
public void testForgetFollower() throws IOException {
|
||||
final ForgetFollowerRequest request = new ForgetFollowerRequest(
|
||||
randomAlphaOfLength(8),
|
||||
randomAlphaOfLength(8),
|
||||
randomAlphaOfLength(8),
|
||||
randomAlphaOfLength(8),
|
||||
randomAlphaOfLength(8));
|
||||
final Request convertedRequest = CcrRequestConverters.forgetFollower(request);
|
||||
assertThat(convertedRequest.getMethod(), equalTo(HttpPost.METHOD_NAME));
|
||||
assertThat(convertedRequest.getEndpoint(), equalTo("/" + request.leaderIndex() + "/_ccr/forget_follower"));
|
||||
assertThat(convertedRequest.getParameters().keySet(), empty());
|
||||
RequestConvertersTests.assertToXContentBody(request, convertedRequest.getEntity());
|
||||
}
|
||||
|
||||
public void testPutAutofollowPattern() throws Exception {
|
||||
PutAutoFollowPatternRequest putAutoFollowPatternRequest = new PutAutoFollowPatternRequest(randomAlphaOfLength(4),
|
||||
randomAlphaOfLength(4), Arrays.asList(generateRandomStringArray(4, 4, false)));
|
||||
|
@ -0,0 +1,90 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.core;
|
||||
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.cluster.ClusterModule;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContent;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.isIn;
|
||||
|
||||
public class BroadcastResponseTests extends ESTestCase {
|
||||
|
||||
public void testFromXContent() throws IOException {
|
||||
final String index = randomAlphaOfLength(8);
|
||||
final String id = randomAlphaOfLength(8);
|
||||
final int total = randomIntBetween(1, 16);
|
||||
final int successful = total - scaledRandomIntBetween(0, total);
|
||||
final int failed = scaledRandomIntBetween(0, total - successful);
|
||||
final List<DefaultShardOperationFailedException> failures = new ArrayList<>();
|
||||
final Set<Integer> shardIds = new HashSet<>();
|
||||
for (int i = 0; i < failed; i++) {
|
||||
final DefaultShardOperationFailedException failure = new DefaultShardOperationFailedException(
|
||||
index,
|
||||
randomValueOtherThanMany(shardIds::contains, () -> randomIntBetween(0, total - 1)),
|
||||
new RetentionLeaseNotFoundException(id));
|
||||
failures.add(failure);
|
||||
shardIds.add(failure.shardId());
|
||||
}
|
||||
|
||||
final org.elasticsearch.action.support.broadcast.BroadcastResponse to =
|
||||
new org.elasticsearch.action.support.broadcast.BroadcastResponse(total, successful, failed, failures);
|
||||
|
||||
final XContentType xContentType = randomFrom(XContentType.values());
|
||||
final BytesReference bytes = toShuffledXContent(to, xContentType, ToXContent.EMPTY_PARAMS, randomBoolean());
|
||||
|
||||
final XContent xContent = XContentFactory.xContent(xContentType);
|
||||
final XContentParser parser = xContent.createParser(
|
||||
new NamedXContentRegistry(ClusterModule.getNamedXWriteables()),
|
||||
LoggingDeprecationHandler.INSTANCE,
|
||||
bytes.streamInput());
|
||||
final BroadcastResponse from = BroadcastResponse.fromXContent(parser);
|
||||
assertThat(from.shards().total(), equalTo(total));
|
||||
assertThat(from.shards().successful(), equalTo(successful));
|
||||
assertThat(from.shards().skipped(), equalTo(0));
|
||||
assertThat(from.shards().failed(), equalTo(failed));
|
||||
assertThat(from.shards().failures(), hasSize(failed == 0 ? failed : 1)); // failures are grouped
|
||||
if (failed > 0) {
|
||||
final DefaultShardOperationFailedException groupedFailure = from.shards().failures().iterator().next();
|
||||
assertThat(groupedFailure.index(), equalTo(index));
|
||||
assertThat(groupedFailure.shardId(), isIn(shardIds));
|
||||
assertThat(groupedFailure.reason(), containsString("reason=retention lease with ID [" + id + "] not found"));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -40,6 +40,7 @@ import org.elasticsearch.client.ccr.FollowInfoRequest;
|
||||
import org.elasticsearch.client.ccr.FollowInfoResponse;
|
||||
import org.elasticsearch.client.ccr.FollowStatsRequest;
|
||||
import org.elasticsearch.client.ccr.FollowStatsResponse;
|
||||
import org.elasticsearch.client.ccr.ForgetFollowerRequest;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse;
|
||||
import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse.Pattern;
|
||||
@ -51,15 +52,18 @@ import org.elasticsearch.client.ccr.PutFollowResponse;
|
||||
import org.elasticsearch.client.ccr.ResumeFollowRequest;
|
||||
import org.elasticsearch.client.ccr.UnfollowRequest;
|
||||
import org.elasticsearch.client.core.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.core.BroadcastResponse;
|
||||
import org.elasticsearch.client.indices.CreateIndexRequest;
|
||||
import org.elasticsearch.client.indices.CreateIndexResponse;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
@ -395,6 +399,101 @@ public class CCRDocumentationIT extends ESRestHighLevelClientTestCase {
|
||||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public void testForgetFollower() throws InterruptedException, IOException {
|
||||
final RestHighLevelClient client = highLevelClient();
|
||||
final String leaderIndex = "leader";
|
||||
{
|
||||
// create leader index
|
||||
final CreateIndexRequest createIndexRequest = new CreateIndexRequest(leaderIndex);
|
||||
final Map<String, String> settings = new HashMap<>(2);
|
||||
final int numberOfShards = randomIntBetween(1, 2);
|
||||
settings.put("index.number_of_shards", Integer.toString(numberOfShards));
|
||||
settings.put("index.soft_deletes.enabled", Boolean.TRUE.toString());
|
||||
createIndexRequest.settings(settings);
|
||||
final CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
|
||||
assertThat(response.isAcknowledged(), is(true));
|
||||
}
|
||||
final String followerIndex = "follower";
|
||||
|
||||
final PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followerIndex, ActiveShardCount.ONE);
|
||||
final PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT);
|
||||
assertTrue(putFollowResponse.isFollowIndexCreated());
|
||||
assertTrue((putFollowResponse.isFollowIndexShardsAcked()));
|
||||
assertTrue(putFollowResponse.isIndexFollowingStarted());
|
||||
|
||||
final PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower");
|
||||
AcknowledgedResponse pauseFollowResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT);
|
||||
assertTrue(pauseFollowResponse.isAcknowledged());
|
||||
|
||||
final String followerCluster = highLevelClient().info(RequestOptions.DEFAULT).getClusterName().value();
|
||||
final Request statsRequest = new Request("GET", "/follower/_stats");
|
||||
final Response statsResponse = client().performRequest(statsRequest);
|
||||
final ObjectPath statsObjectPath = ObjectPath.createFromResponse(statsResponse);
|
||||
final String followerIndexUUID = statsObjectPath.evaluate("indices.follower.uuid");
|
||||
|
||||
final String leaderCluster = "local";
|
||||
|
||||
// tag::ccr-forget-follower-request
|
||||
final ForgetFollowerRequest request = new ForgetFollowerRequest(
|
||||
followerCluster, // <1>
|
||||
followerIndex, // <2>
|
||||
followerIndexUUID, // <3>
|
||||
leaderCluster, // <4>
|
||||
leaderIndex); // <5>
|
||||
// end::ccr-forget-follower-request
|
||||
|
||||
// tag::ccr-forget-follower-execute
|
||||
final BroadcastResponse response = client
|
||||
.ccr()
|
||||
.forgetFollower(request, RequestOptions.DEFAULT);
|
||||
// end::ccr-forget-follower-execute
|
||||
|
||||
// tag::ccr-forget-follower-response
|
||||
final BroadcastResponse.Shards shards = response.shards(); // <1>
|
||||
final int total = shards.total(); // <2>
|
||||
final int successful = shards.successful(); // <3>
|
||||
final int skipped = shards.skipped(); // <4>
|
||||
final int failed = shards.failed(); // <5>
|
||||
shards.failures().forEach(failure -> {}); // <6>
|
||||
// end::ccr-forget-follower-response
|
||||
|
||||
// tag::ccr-forget-follower-execute-listener
|
||||
ActionListener<BroadcastResponse> listener =
|
||||
new ActionListener<BroadcastResponse>() {
|
||||
|
||||
@Override
|
||||
public void onResponse(final BroadcastResponse response) {
|
||||
final BroadcastResponse.Shards shards = // <1>
|
||||
response.shards();
|
||||
final int total = shards.total();
|
||||
final int successful = shards.successful();
|
||||
final int skipped = shards.skipped();
|
||||
final int failed = shards.failed();
|
||||
shards.failures().forEach(failure -> {});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
// <2>
|
||||
}
|
||||
|
||||
};
|
||||
// end::ccr-forget-follower-execute-listener
|
||||
|
||||
// replace the empty listener by a blocking listener in test
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
listener = new LatchedActionListener<>(listener, latch);
|
||||
|
||||
// tag::ccr-forget-follower-execute-async
|
||||
client.ccr().forgetFollowerAsync(
|
||||
request,
|
||||
RequestOptions.DEFAULT,
|
||||
listener); // <1>
|
||||
// end::ccr-forget-follower-execute-async
|
||||
|
||||
assertTrue(latch.await(30L, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
public void testPutAutoFollowPattern() throws Exception {
|
||||
RestHighLevelClient client = highLevelClient();
|
||||
|
||||
|
45
docs/java-rest/high-level/ccr/forget_follower.asciidoc
Normal file
45
docs/java-rest/high-level/ccr/forget_follower.asciidoc
Normal file
@ -0,0 +1,45 @@
|
||||
--
|
||||
:api: ccr-forget-follower
|
||||
:request: ForgetFollowerRequest
|
||||
:response: BroadcastResponse
|
||||
--
|
||||
|
||||
[id="{upid}-{api}"]
|
||||
=== Forget Follower API
|
||||
|
||||
[id="{upid}-{api}-request"]
|
||||
==== Request
|
||||
|
||||
The Forget Follower API allows you to manually remove the follower retention
|
||||
leases from the leader. Note that these retention leases are automatically
|
||||
managed by the following index. This API exists only for cases when invoking
|
||||
the unfollow API on the follower index is unable to remove the follower
|
||||
retention leases.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests-file}[{api}-request]
|
||||
--------------------------------------------------
|
||||
<1> The name of the cluster containing the follower index.
|
||||
<2> The name of the follower index.
|
||||
<3> The UUID of the follower index (can be obtained from index stats).
|
||||
<4> The alias of the remote cluster containing the leader index.
|
||||
<5> The name of the leader index.
|
||||
|
||||
[id="{upid}-{api}-response"]
|
||||
==== Response
|
||||
|
||||
The returned +{response}+ indicates if the response was successful.
|
||||
|
||||
["source","java",subs="attributes,callouts,macros"]
|
||||
--------------------------------------------------
|
||||
include-tagged::{doc-tests-file}[{api}-response]
|
||||
--------------------------------------------------
|
||||
<1> The high-level shards summary.
|
||||
<2> The total number of shards the request was executed on.
|
||||
<3> The total number of shards the request was successful on.
|
||||
<4> The total number of shards the request was skipped on (should always be zero).
|
||||
<5> The total number of shards the request failed on.
|
||||
<6> The shard-level failures.
|
||||
|
||||
include::../execution.asciidoc[]
|
@ -502,6 +502,7 @@ The Java High Level REST Client supports the following CCR APIs:
|
||||
* <<{upid}-ccr-pause-follow>>
|
||||
* <<{upid}-ccr-resume-follow>>
|
||||
* <<{upid}-ccr-unfollow>>
|
||||
* <<{upid}-ccr-forget-follower>>
|
||||
* <<{upid}-ccr-put-auto-follow-pattern>>
|
||||
* <<{upid}-ccr-delete-auto-follow-pattern>>
|
||||
* <<{upid}-ccr-get-auto-follow-pattern>>
|
||||
|
@ -19,6 +19,7 @@ You can use the following APIs to perform {ccr} operations.
|
||||
* <<ccr-post-pause-follow,Pause follower>>
|
||||
* <<ccr-post-resume-follow,Resume follower>>
|
||||
* <<ccr-post-unfollow,Convert follower index to a regular index>>
|
||||
* <<ccr-post-forget-follower,Remove follower retention leases from the leader>>
|
||||
* <<ccr-get-follow-stats,Get stats about follower indices>>
|
||||
* <<ccr-get-follow-info,Get info about follower indices>>
|
||||
|
||||
@ -38,6 +39,7 @@ include::follow/put-follow.asciidoc[]
|
||||
include::follow/post-pause-follow.asciidoc[]
|
||||
include::follow/post-resume-follow.asciidoc[]
|
||||
include::follow/post-unfollow.asciidoc[]
|
||||
include::follow/post-forget-follower.asciidoc[]
|
||||
include::follow/get-follow-stats.asciidoc[]
|
||||
include::follow/get-follow-info.asciidoc[]
|
||||
|
||||
|
152
docs/reference/ccr/apis/follow/post-forget-follower.asciidoc
Normal file
152
docs/reference/ccr/apis/follow/post-forget-follower.asciidoc
Normal file
@ -0,0 +1,152 @@
|
||||
[role="xpack"]
|
||||
[testenv="platinum"]
|
||||
[[ccr-post-forget-follower]]
|
||||
=== Forget Follower API
|
||||
++++
|
||||
<titleabbrev>Forget Follower</titleabbrev>
|
||||
++++
|
||||
|
||||
Removes the follower retention leases from the leader.
|
||||
|
||||
==== Description
|
||||
|
||||
A following index takes out retention leases on its leader index. These
|
||||
retention leases are used to increase the likelihood that the shards of the
|
||||
leader index retain the history of operations that the shards of the following
|
||||
index need to execute replication. When a follower index is converted to a
|
||||
regular index via the <<ccr-post-unfollow,unfollow API>> (either via explicit
|
||||
execution of this API, or implicitly via {ilm}), these retention leases are
|
||||
removed. However, removing these retention leases can fail (e.g., if the remote
|
||||
cluster containing the leader index is unavailable). While these retention
|
||||
leases will eventually expire on their own, their extended existence can cause
|
||||
the leader index to hold more history than necessary, and prevent {ilm} from
|
||||
performing some operations on the leader index. This API exists to enable
|
||||
manually removing these retention leases when the unfollow API was unable to do
|
||||
so.
|
||||
|
||||
NOTE: This API does not stop replication by a following index. If you use this
|
||||
API targeting a follower index that is still actively following, the following
|
||||
index will add back retention leases on the leader. The only purpose of this API
|
||||
is to handle the case of failure to remove the following retention leases after
|
||||
the <<ccr-post-unfollow,unfollow API>> is invoked.
|
||||
|
||||
==== Request
|
||||
|
||||
//////////////////////////
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
PUT /follower_index/_ccr/follow?wait_for_active_shards=1
|
||||
{
|
||||
"remote_cluster" : "remote_cluster",
|
||||
"leader_index" : "leader_index"
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TESTSETUP
|
||||
// TEST[setup:remote_cluster_and_leader_index]
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST /follower_index/_ccr/pause_follow
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEARDOWN
|
||||
|
||||
//////////////////////////
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST /<leader_index>/_ccr/forget_follower
|
||||
{
|
||||
"follower_cluster" : "<follower_cluster>",
|
||||
"follower_index" : "<follower_index>",
|
||||
"follower_index_uuid" : "<follower_index_uuid>",
|
||||
"leader_remote_cluster" : "<leader_remote_cluster>"
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[s/<leader_index>/leader_index/]
|
||||
// TEST[s/<follower_cluster>/follower_cluster/]
|
||||
// TEST[s/<follower_index>/follower_index/]
|
||||
// TEST[s/<follower_index_uuid>/follower_index_uuid/]
|
||||
// TEST[s/<leader_remote_cluster>/leader_remote_cluster/]
|
||||
// TEST[skip_shard_failures]
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"_shards" : {
|
||||
"total" : 1,
|
||||
"successful" : 1,
|
||||
"failed" : 0,
|
||||
"failures" : [ ]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"total" : 1/"total" : $body._shards.total/]
|
||||
// TESTRESPONSE[s/"successful" : 1/"successful" : $body._shards.successful/]
|
||||
// TESTRESPONSE[s/"failed" : 0/"failed" : $body._shards.failed/]
|
||||
// TESTRESPONSE[s/"failures" : \[ \]/"failures" : $body._shards.failures/]
|
||||
|
||||
==== Path Parameters
|
||||
|
||||
`leader_index` (required)::
|
||||
(string) the name of the leader index
|
||||
|
||||
==== Request Body
|
||||
`follower_cluster` (required)::
|
||||
(string) the name of the cluster containing the follower index
|
||||
|
||||
`follower_index` (required)::
|
||||
(string) the name of the follower index
|
||||
|
||||
`follower_index_uuid` (required)::
|
||||
(string) the UUID of the follower index
|
||||
|
||||
`leader_remote_cluster` (required)::
|
||||
(string) the alias (from the perspective of the cluster containing the
|
||||
follower index) of the <<modules-remote-clusters,remote cluster>> containing
|
||||
the leader index
|
||||
|
||||
==== Authorization
|
||||
|
||||
If the {es} {security-features} are enabled, you must have `manage_leader_index`
|
||||
index privileges for the leader index. For more information, see
|
||||
{stack-ov}/security-privileges.html[Security privileges].
|
||||
|
||||
==== Example
|
||||
|
||||
This example removes the follower retention leases for `follower_index` from
|
||||
`leader_index`.
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
POST /leader_index/_ccr/forget_follower
|
||||
{
|
||||
"follower_cluster" : "",
|
||||
"follower_index" : "follower_index",
|
||||
"follower_index_uuid" : "",
|
||||
"leader_remote_cluster" : ""
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[skip_shard_failures]
|
||||
|
||||
The API returns the following result:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"_shards" : {
|
||||
"total" : 1,
|
||||
"successful" : 1,
|
||||
"failed" : 0,
|
||||
"failures" : [ ]
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"total" : 1/"total" : $body._shards.total/]
|
||||
// TESTRESPONSE[s/"successful" : 1/"successful" : $body._shards.successful/]
|
||||
// TESTRESPONSE[s/"failed" : 0/"failed" : $body._shards.failed/]
|
||||
// TESTRESPONSE[s/"failures" : \[ \]/"failures" : $body._shards.failures/]
|
@ -41,7 +41,7 @@ public class DefaultShardOperationFailedException extends ShardOperationFailedEx
|
||||
private static final String SHARD_ID = "shard";
|
||||
private static final String REASON = "reason";
|
||||
|
||||
private static final ConstructingObjectParser<DefaultShardOperationFailedException, Void> PARSER = new ConstructingObjectParser<>(
|
||||
public static final ConstructingObjectParser<DefaultShardOperationFailedException, Void> PARSER = new ConstructingObjectParser<>(
|
||||
"failures", true, arg -> new DefaultShardOperationFailedException((String) arg[0], (int) arg[1] ,(Throwable) arg[2]));
|
||||
|
||||
static {
|
||||
|
@ -0,0 +1,80 @@
|
||||
---
|
||||
"Test forget follower":
|
||||
- do:
|
||||
cluster.state: {}
|
||||
|
||||
- set: {master_node: master}
|
||||
|
||||
- do:
|
||||
nodes.info: {}
|
||||
|
||||
- set: {nodes.$master.transport_address: local_ip}
|
||||
|
||||
- do:
|
||||
cluster.put_settings:
|
||||
body:
|
||||
transient:
|
||||
cluster.remote.remote_cluster.seeds: $local_ip
|
||||
flat_settings: true
|
||||
|
||||
- match: {transient: {cluster.remote.remote_cluster.seeds: $local_ip}}
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: leader_index
|
||||
body:
|
||||
settings:
|
||||
index:
|
||||
number_of_shards: 1
|
||||
soft_deletes:
|
||||
enabled: true
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.follow:
|
||||
index: follower_index
|
||||
wait_for_active_shards: 1
|
||||
body:
|
||||
remote_cluster: remote_cluster
|
||||
leader_index: leader_index
|
||||
- is_true: follow_index_created
|
||||
- is_true: follow_index_shards_acked
|
||||
- is_true: index_following_started
|
||||
|
||||
- do:
|
||||
info: {}
|
||||
|
||||
- set: {cluster_name: cluster_name}
|
||||
|
||||
- do:
|
||||
indices.stats: {index: follower_index}
|
||||
|
||||
- set: {indices.follower_index.uuid: follower_index_uuid}
|
||||
|
||||
- do:
|
||||
ccr.forget_follower:
|
||||
index: leader_index
|
||||
body:
|
||||
follower_cluster: $cluster_name
|
||||
follower_index: follower_index
|
||||
follower_index_uuid: $follower_index_uuid
|
||||
leader_remote_cluster: remote_cluster
|
||||
- match: { _shards.total: 1 }
|
||||
- match: { _shards.successful: 1}
|
||||
- match: { _shards.failed: 0}
|
||||
- is_false: _shards.failure
|
||||
|
||||
- do:
|
||||
ccr.pause_follow:
|
||||
index: follower_index
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
indices.close:
|
||||
index: follower_index
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
ccr.unfollow:
|
||||
index: follower_index
|
||||
- is_true: acknowledged
|
@ -22,7 +22,7 @@ leaderClusterTestCluster {
|
||||
setupCommand 'setupTestAdmin',
|
||||
'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser"
|
||||
setupCommand 'setupCcrUser',
|
||||
'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "manage_ccr"
|
||||
'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "ccruser"
|
||||
waitCondition = { node, ant ->
|
||||
File tmpFile = new File(node.cwd, 'wait.success')
|
||||
ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow",
|
||||
|
@ -2,7 +2,7 @@ ccruser:
|
||||
cluster:
|
||||
- manage_ccr
|
||||
indices:
|
||||
- names: [ 'allowed-index', 'logs-eu-*' ]
|
||||
- names: [ 'allowed-index', 'forget-follower', 'logs-eu-*' ]
|
||||
privileges:
|
||||
- monitor
|
||||
- read
|
||||
|
@ -2,7 +2,8 @@ ccruser:
|
||||
cluster:
|
||||
- read_ccr
|
||||
indices:
|
||||
- names: [ 'allowed-index', 'logs-eu-*' ]
|
||||
- names: [ 'allowed-index', 'forget-leader', 'logs-eu-*' ]
|
||||
privileges:
|
||||
- monitor
|
||||
- read
|
||||
- manage_leader_index
|
||||
|
@ -6,6 +6,7 @@
|
||||
package org.elasticsearch.xpack.ccr;
|
||||
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.Strings;
|
||||
@ -13,14 +14,19 @@ import org.elasticsearch.common.settings.SecureString;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.test.rest.yaml.ObjectPath;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class FollowIndexSecurityIT extends ESCCRRestTestCase {
|
||||
@ -176,4 +182,55 @@ public class FollowIndexSecurityIT extends ESCCRRestTestCase {
|
||||
pauseFollow(client(), allowedIndex);
|
||||
}
|
||||
|
||||
public void testForgetFollower() throws IOException {
|
||||
final String forgetLeader = "forget-leader";
|
||||
final String forgetFollower = "forget-follower";
|
||||
if ("leader".equals(targetCluster)) {
|
||||
logger.info("running against leader cluster");
|
||||
final Settings indexSettings = Settings.builder()
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.build();
|
||||
createIndex(forgetLeader, indexSettings);
|
||||
} else {
|
||||
logger.info("running against follower cluster");
|
||||
followIndex(client(), "leader_cluster", forgetLeader, forgetFollower);
|
||||
|
||||
final Response response = client().performRequest(new Request("GET", "/" + forgetFollower + "/_stats"));
|
||||
final String followerIndexUUID = ObjectPath.createFromResponse(response).evaluate("indices." + forgetFollower + ".uuid");
|
||||
|
||||
assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow")));
|
||||
|
||||
try (RestClient leaderClient = buildLeaderClient(restClientSettings())) {
|
||||
final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower");
|
||||
final String requestBody = "{" +
|
||||
"\"follower_cluster\":\"follow-cluster\"," +
|
||||
"\"follower_index\":\"" + forgetFollower + "\"," +
|
||||
"\"follower_index_uuid\":\"" + followerIndexUUID + "\"," +
|
||||
"\"leader_remote_cluster\":\"leader_cluster\"" +
|
||||
"}";
|
||||
request.setJsonEntity(requestBody);
|
||||
final Response forgetFollowerResponse = leaderClient.performRequest(request);
|
||||
assertOK(forgetFollowerResponse);
|
||||
final Map<?, ?> shards = ObjectPath.createFromResponse(forgetFollowerResponse).evaluate("_shards");
|
||||
assertNull(shards.get("failures"));
|
||||
assertThat(shards.get("total"), equalTo(1));
|
||||
assertThat(shards.get("successful"), equalTo(1));
|
||||
assertThat(shards.get("failed"), equalTo(0));
|
||||
|
||||
final Request retentionLeasesRequest = new Request("GET", "/" + forgetLeader + "/_stats");
|
||||
retentionLeasesRequest.addParameter("level", "shards");
|
||||
final Response retentionLeasesResponse = leaderClient.performRequest(retentionLeasesRequest);
|
||||
final ArrayList<Object> shardsStats =
|
||||
ObjectPath.createFromResponse(retentionLeasesResponse).evaluate("indices." + forgetLeader + ".shards.0");
|
||||
assertThat(shardsStats, hasSize(1));
|
||||
final Map<?, ?> shardStatsAsMap = (Map<?, ?>) shardsStats.get(0);
|
||||
final Map<?, ?> retentionLeasesStats = (Map<?, ?>) shardStatsAsMap.get("retention_leases");
|
||||
final List<?> leases = (List<?>) retentionLeasesStats.get("leases");
|
||||
assertThat(leases, empty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -255,16 +255,25 @@ public class ESCCRRestTestCase extends ESRestTestCase {
|
||||
return buildClient(System.getProperty("tests.leader_host"));
|
||||
}
|
||||
|
||||
protected RestClient buildLeaderClient(final Settings settings) throws IOException {
|
||||
assert "leader".equals(targetCluster) == false;
|
||||
return buildClient(System.getProperty("tests.leader_host"), settings);
|
||||
}
|
||||
|
||||
protected RestClient buildMiddleClient() throws IOException {
|
||||
assert "middle".equals(targetCluster) == false;
|
||||
return buildClient(System.getProperty("tests.middle_host"));
|
||||
}
|
||||
|
||||
private RestClient buildClient(final String url) throws IOException {
|
||||
return buildClient(url, restAdminSettings());
|
||||
}
|
||||
|
||||
private RestClient buildClient(final String url, final Settings settings) throws IOException {
|
||||
int portSeparator = url.lastIndexOf(':');
|
||||
HttpHost httpHost = new HttpHost(url.substring(0, portSeparator),
|
||||
Integer.parseInt(url.substring(portSeparator + 1)), getProtocol());
|
||||
return buildClient(restAdminSettings(), new HttpHost[]{httpHost});
|
||||
Integer.parseInt(url.substring(portSeparator + 1)), getProtocol());
|
||||
return buildClient(settings, new HttpHost[]{httpHost});
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -55,6 +55,7 @@ import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportFollowInfoAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
|
||||
@ -75,6 +76,7 @@ import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestFollowInfoAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
|
||||
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
|
||||
@ -88,6 +90,7 @@ import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
||||
@ -223,7 +226,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
||||
// auto-follow actions
|
||||
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
|
||||
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
|
||||
new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class));
|
||||
new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class),
|
||||
// forget follower action
|
||||
new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class));
|
||||
}
|
||||
|
||||
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
|
||||
@ -247,7 +252,9 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
|
||||
// auto-follow APIs
|
||||
new RestDeleteAutoFollowPatternAction(settings, restController),
|
||||
new RestPutAutoFollowPatternAction(settings, restController),
|
||||
new RestGetAutoFollowPatternAction(settings, restController));
|
||||
new RestGetAutoFollowPatternAction(settings, restController),
|
||||
// forget follower API
|
||||
new RestForgetFollowerAction(settings, restController));
|
||||
}
|
||||
|
||||
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
|
||||
|
@ -0,0 +1,151 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ccr.action;
|
||||
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
|
||||
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.PlainShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public class TransportForgetFollowerAction extends TransportBroadcastByNodeAction<
|
||||
ForgetFollowerAction.Request,
|
||||
BroadcastResponse,
|
||||
TransportBroadcastByNodeAction.EmptyResult> {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final IndicesService indicesService;
|
||||
|
||||
@Inject
|
||||
public TransportForgetFollowerAction(
|
||||
final ClusterService clusterService,
|
||||
final TransportService transportService,
|
||||
final ActionFilters actionFilters,
|
||||
final IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
final IndicesService indicesService) {
|
||||
super(
|
||||
ForgetFollowerAction.NAME,
|
||||
Objects.requireNonNull(clusterService),
|
||||
Objects.requireNonNull(transportService),
|
||||
Objects.requireNonNull(actionFilters),
|
||||
Objects.requireNonNull(indexNameExpressionResolver),
|
||||
ForgetFollowerAction.Request::new,
|
||||
ThreadPool.Names.MANAGEMENT);
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService = Objects.requireNonNull(indicesService);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EmptyResult readShardResult(final StreamInput in) {
|
||||
return EmptyResult.readEmptyResultFrom(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BroadcastResponse newResponse(
|
||||
final ForgetFollowerAction.Request request,
|
||||
final int totalShards,
|
||||
final int successfulShards,
|
||||
final int failedShards, List<EmptyResult> emptyResults,
|
||||
final List<DefaultShardOperationFailedException> shardFailures,
|
||||
final ClusterState clusterState) {
|
||||
return new BroadcastResponse(totalShards, successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ForgetFollowerAction.Request readRequestFrom(final StreamInput in) throws IOException {
|
||||
return new ForgetFollowerAction.Request(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EmptyResult shardOperation(final ForgetFollowerAction.Request request, final ShardRouting shardRouting) {
|
||||
final Index followerIndex = new Index(request.followerIndex(), request.followerIndexUUID());
|
||||
final Index leaderIndex = clusterService.state().metaData().index(request.leaderIndex()).getIndex();
|
||||
final String id = CcrRetentionLeases.retentionLeaseId(
|
||||
request.followerCluster(),
|
||||
followerIndex,
|
||||
request.leaderRemoteCluster(),
|
||||
leaderIndex);
|
||||
|
||||
final IndexShard indexShard = indicesService.indexServiceSafe(leaderIndex).getShard(shardRouting.shardId().id());
|
||||
|
||||
final PlainActionFuture<Releasable> permit = new PlainActionFuture<>();
|
||||
indexShard.acquirePrimaryOperationPermit(permit, ThreadPool.Names.SAME, request);
|
||||
try (Releasable ignored = permit.get()) {
|
||||
final PlainActionFuture<ReplicationResponse> future = new PlainActionFuture<>();
|
||||
indexShard.removeRetentionLease(id, future);
|
||||
future.get();
|
||||
} catch (final ExecutionException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
return EmptyResult.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ShardsIterator shards(
|
||||
final ClusterState clusterState,
|
||||
final ForgetFollowerAction.Request request,
|
||||
final String[] concreteIndices) {
|
||||
final GroupShardsIterator<ShardIterator> activePrimaryShards =
|
||||
clusterState.routingTable().activePrimaryShardsGrouped(concreteIndices, false);
|
||||
final List<ShardRouting> shardRoutings = new ArrayList<>();
|
||||
final Iterator<ShardIterator> it = activePrimaryShards.iterator();
|
||||
while (it.hasNext()) {
|
||||
final ShardIterator shardIterator = it.next();
|
||||
final ShardRouting primaryShard = shardIterator.nextOrNull();
|
||||
assert primaryShard != null;
|
||||
shardRoutings.add(primaryShard);
|
||||
if (Assertions.ENABLED) {
|
||||
final ShardRouting maybeNextPrimaryShard = shardIterator.nextOrNull();
|
||||
assert maybeNextPrimaryShard == null : maybeNextPrimaryShard;
|
||||
}
|
||||
}
|
||||
return new PlainShardsIterator(shardRoutings);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkGlobalBlock(final ClusterState state, final ForgetFollowerAction.Request request) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkRequestBlock(
|
||||
final ClusterState state,
|
||||
final ForgetFollowerAction.Request request,
|
||||
final String[] concreteIndices) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.ccr.rest;
|
||||
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.BytesRestResponse;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class RestForgetFollowerAction extends BaseRestHandler {
|
||||
|
||||
public RestForgetFollowerAction(final Settings settings, final RestController restController) {
|
||||
super(Objects.requireNonNull(settings));
|
||||
Objects.requireNonNull(restController);
|
||||
restController.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/forget_follower", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "forget_follower_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
|
||||
final String leaderIndex = restRequest.param("index");
|
||||
|
||||
return channel -> {
|
||||
try (XContentParser parser = restRequest.contentOrSourceParamParser()) {
|
||||
final ForgetFollowerAction.Request request = ForgetFollowerAction.Request.fromXContent(parser, leaderIndex);
|
||||
client.execute(ForgetFollowerAction.INSTANCE, request, new RestToXContentListener<>(channel));
|
||||
} catch (final IOException e) {
|
||||
channel.sendResponse(new BytesRestResponse(channel, RestStatus.INTERNAL_SERVER_ERROR, e));
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -19,6 +19,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
@ -53,6 +54,7 @@ import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
|
||||
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
|
||||
@ -80,6 +82,7 @@ import static org.elasticsearch.xpack.ccr.CcrRetentionLeases.retentionLeaseId;
|
||||
import static org.hamcrest.Matchers.arrayWithSize;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.emptyArray;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
@ -918,6 +921,79 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
public void testForgetFollower() throws Exception {
|
||||
final String leaderIndex = "leader";
|
||||
final String followerIndex = "follower";
|
||||
final int numberOfShards = randomIntBetween(1, 4);
|
||||
final String leaderIndexSettings =
|
||||
getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
|
||||
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get());
|
||||
ensureLeaderYellow(leaderIndex);
|
||||
final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex);
|
||||
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
|
||||
|
||||
ensureFollowerGreen(true, followerIndex);
|
||||
|
||||
pauseFollow(followerIndex);
|
||||
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
|
||||
|
||||
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
try {
|
||||
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
|
||||
final MockTransportService senderTransportService =
|
||||
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
|
||||
senderTransportService.addSendBehavior(
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)
|
||||
|| TransportActionProxy.getProxyAction(RetentionLeaseActions.Remove.ACTION_NAME).equals(action)) {
|
||||
final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request;
|
||||
if (randomBoolean()) {
|
||||
throw new ConnectTransportException(connection.getNode(), "connection failed");
|
||||
} else {
|
||||
throw new IndexShardClosedException(removeRequest.getShardId());
|
||||
}
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
}
|
||||
|
||||
expectThrows(
|
||||
ElasticsearchException.class,
|
||||
() -> followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet());
|
||||
|
||||
final ClusterStateResponse followerIndexClusterState =
|
||||
followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get();
|
||||
final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID();
|
||||
|
||||
final BroadcastResponse forgetFollowerResponse = leaderClient().execute(
|
||||
ForgetFollowerAction.INSTANCE,
|
||||
new ForgetFollowerAction.Request(
|
||||
getFollowerCluster().getClusterName(),
|
||||
followerIndex,
|
||||
followerUUID,
|
||||
"leader_cluster",
|
||||
leaderIndex)).actionGet();
|
||||
|
||||
assertThat(forgetFollowerResponse.getTotalShards(), equalTo(numberOfShards));
|
||||
assertThat(forgetFollowerResponse.getSuccessfulShards(), equalTo(numberOfShards));
|
||||
assertThat(forgetFollowerResponse.getFailedShards(), equalTo(0));
|
||||
assertThat(forgetFollowerResponse.getShardFailures(), emptyArray());
|
||||
|
||||
final IndicesStatsResponse afterForgetFollowerStats =
|
||||
leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet();
|
||||
final List<ShardStats> afterForgetFollowerShardsStats = getShardsStats(afterForgetFollowerStats);
|
||||
for (final ShardStats shardStats : afterForgetFollowerShardsStats) {
|
||||
assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty());
|
||||
}
|
||||
} finally {
|
||||
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) {
|
||||
final MockTransportService senderTransportService =
|
||||
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
|
||||
senderTransportService.clearAllRules();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void assertRetentionLeaseRenewal(
|
||||
final int numberOfShards,
|
||||
final int numberOfReplicas,
|
||||
|
@ -0,0 +1,171 @@
|
||||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.ccr.action;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastRequest;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
public class ForgetFollowerAction extends Action<BroadcastResponse> {
|
||||
|
||||
public static final String NAME = "indices:admin/xpack/ccr/forget_follower";
|
||||
public static final ForgetFollowerAction INSTANCE = new ForgetFollowerAction();
|
||||
|
||||
private ForgetFollowerAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BroadcastResponse newResponse() {
|
||||
return new BroadcastResponse();
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a forget follower request. Note that this an expert API intended to be used only when unfollowing a follower index fails
|
||||
* to emove the follower retention leases. Please be sure that you understand the purpose this API before using.
|
||||
*/
|
||||
public static class Request extends BroadcastRequest<Request> {
|
||||
|
||||
private static final ParseField FOLLOWER_CLUSTER = new ParseField("follower_cluster");
|
||||
private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index");
|
||||
private static final ParseField FOLLOWER_INDEX_UUID = new ParseField("follower_index_uuid");
|
||||
private static final ParseField LEADER_REMOTE_CLUSTER = new ParseField("leader_remote_cluster");
|
||||
|
||||
private static final ObjectParser<String[], Void> PARSER = new ObjectParser<>(NAME, () -> new String[4]);
|
||||
|
||||
static {
|
||||
PARSER.declareString((parameters, value) -> parameters[0] = value, FOLLOWER_CLUSTER);
|
||||
PARSER.declareString((parameters, value) -> parameters[1] = value, FOLLOWER_INDEX);
|
||||
PARSER.declareString((parameters, value) -> parameters[2] = value, FOLLOWER_INDEX_UUID);
|
||||
PARSER.declareString((parameters, value) -> parameters[3] = value, LEADER_REMOTE_CLUSTER);
|
||||
}
|
||||
|
||||
public static ForgetFollowerAction.Request fromXContent(
|
||||
final XContentParser parser,
|
||||
final String leaderIndex) throws IOException {
|
||||
final String[] parameters = PARSER.parse(parser, null);
|
||||
return new Request(parameters[0], parameters[1], parameters[2], parameters[3], leaderIndex);
|
||||
}
|
||||
|
||||
private String followerCluster;
|
||||
|
||||
/**
|
||||
* The name of the cluster containing the follower index.
|
||||
*
|
||||
* @return the name of the cluster containing the follower index
|
||||
*/
|
||||
public String followerCluster() {
|
||||
return followerCluster;
|
||||
}
|
||||
|
||||
private String followerIndex;
|
||||
|
||||
/**
|
||||
* The name of the follower index.
|
||||
*
|
||||
* @return the name of the follower index
|
||||
*/
|
||||
public String followerIndex() {
|
||||
return followerIndex;
|
||||
}
|
||||
|
||||
private String followerIndexUUID;
|
||||
|
||||
/**
|
||||
* The UUID of the follower index.
|
||||
*
|
||||
* @return the UUID of the follower index
|
||||
*/
|
||||
public String followerIndexUUID() {
|
||||
return followerIndexUUID;
|
||||
}
|
||||
|
||||
private String leaderRemoteCluster;
|
||||
|
||||
/**
|
||||
* The alias of the remote cluster containing the leader index.
|
||||
*
|
||||
* @return the alias of the remote cluster
|
||||
*/
|
||||
public String leaderRemoteCluster() {
|
||||
return leaderRemoteCluster;
|
||||
}
|
||||
|
||||
private String leaderIndex;
|
||||
|
||||
/**
|
||||
* The name of the leader index.
|
||||
*
|
||||
* @return the name of the leader index
|
||||
*/
|
||||
public String leaderIndex() {
|
||||
return leaderIndex;
|
||||
}
|
||||
|
||||
public Request() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a forget follower request.
|
||||
*
|
||||
* @param followerCluster the name of the cluster containing the follower index to forget
|
||||
* @param followerIndex the name of follower index
|
||||
* @param followerIndexUUID the UUID of the follower index
|
||||
* @param leaderRemoteCluster the alias of the remote cluster containing the leader index from the perspective of the follower index
|
||||
* @param leaderIndex the name of the leader index
|
||||
*/
|
||||
public Request(
|
||||
final String followerCluster,
|
||||
final String followerIndex,
|
||||
final String followerIndexUUID,
|
||||
final String leaderRemoteCluster,
|
||||
final String leaderIndex) {
|
||||
super(new String[]{leaderIndex});
|
||||
this.followerCluster = Objects.requireNonNull(followerCluster);
|
||||
this.leaderIndex = Objects.requireNonNull(leaderIndex);
|
||||
this.leaderRemoteCluster = Objects.requireNonNull(leaderRemoteCluster);
|
||||
this.followerIndex = Objects.requireNonNull(followerIndex);
|
||||
this.followerIndexUUID = Objects.requireNonNull(followerIndexUUID);
|
||||
}
|
||||
|
||||
public Request(final StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
followerCluster = in.readString();
|
||||
leaderIndex = in.readString();
|
||||
leaderRemoteCluster = in.readString();
|
||||
followerIndex = in.readString();
|
||||
followerIndexUUID = in.readString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(followerCluster);
|
||||
out.writeString(leaderIndex);
|
||||
out.writeString(leaderRemoteCluster);
|
||||
out.writeString(followerIndex);
|
||||
out.writeString(followerIndexUUID);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -9,11 +9,13 @@ package org.elasticsearch.xpack.core.ccr.client;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
|
||||
@ -96,6 +98,16 @@ public class CcrClient {
|
||||
return listener;
|
||||
}
|
||||
|
||||
public void forgetFollower(final ForgetFollowerAction.Request request, final ActionListener<BroadcastResponse> listener) {
|
||||
client.execute(ForgetFollowerAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
public ActionFuture<BroadcastResponse> forgetFollower(final ForgetFollowerAction.Request request) {
|
||||
final PlainActionFuture<BroadcastResponse> listener = PlainActionFuture.newFuture();
|
||||
client.execute(ForgetFollowerAction.INSTANCE, request, listener);
|
||||
return listener;
|
||||
}
|
||||
|
||||
public void putAutoFollowPattern(
|
||||
final PutAutoFollowPatternAction.Request request,
|
||||
final ActionListener<AcknowledgedResponse> listener) {
|
||||
|
@ -22,6 +22,7 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction;
|
||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction;
|
||||
@ -62,6 +63,7 @@ public final class IndexPrivilege extends Privilege {
|
||||
ExplainLifecycleAction.NAME);
|
||||
private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns(PutFollowAction.NAME, UnfollowAction.NAME,
|
||||
CloseIndexAction.NAME + "*");
|
||||
private static final Automaton MANAGE_LEADER_INDEX_AUTOMATON = patterns(ForgetFollowerAction.NAME + "*");
|
||||
private static final Automaton MANAGE_ILM_AUTOMATON = patterns("indices:admin/ilm/*");
|
||||
|
||||
public static final IndexPrivilege NONE = new IndexPrivilege("none", Automatons.EMPTY);
|
||||
@ -78,6 +80,7 @@ public final class IndexPrivilege extends Privilege {
|
||||
public static final IndexPrivilege CREATE_INDEX = new IndexPrivilege("create_index", CREATE_INDEX_AUTOMATON);
|
||||
public static final IndexPrivilege VIEW_METADATA = new IndexPrivilege("view_index_metadata", VIEW_METADATA_AUTOMATON);
|
||||
public static final IndexPrivilege MANAGE_FOLLOW_INDEX = new IndexPrivilege("manage_follow_index", MANAGE_FOLLOW_INDEX_AUTOMATON);
|
||||
public static final IndexPrivilege MANAGE_LEADER_INDEX = new IndexPrivilege("manage_leader_index", MANAGE_LEADER_INDEX_AUTOMATON);
|
||||
public static final IndexPrivilege MANAGE_ILM = new IndexPrivilege("manage_ilm", MANAGE_ILM_AUTOMATON);
|
||||
|
||||
private static final Map<String, IndexPrivilege> VALUES = MapBuilder.<String, IndexPrivilege>newMapBuilder()
|
||||
@ -95,6 +98,7 @@ public final class IndexPrivilege extends Privilege {
|
||||
.put("view_index_metadata", VIEW_METADATA)
|
||||
.put("read_cross_cluster", READ_CROSS_CLUSTER)
|
||||
.put("manage_follow_index", MANAGE_FOLLOW_INDEX)
|
||||
.put("manage_leader_index", MANAGE_LEADER_INDEX)
|
||||
.put("manage_ilm", MANAGE_ILM)
|
||||
.immutableMap();
|
||||
|
||||
|
@ -0,0 +1,21 @@
|
||||
{
|
||||
"ccr.forget_follower": {
|
||||
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current",
|
||||
"methods": [ "POST" ],
|
||||
"url": {
|
||||
"path": "/{index}/_ccr/forget_follower",
|
||||
"paths": [ "/{index}/_ccr/forget_follower" ],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type": "string",
|
||||
"required": true,
|
||||
"description": "the name of the leader index for which specified follower retention leases should be removed"
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": {
|
||||
"description" : "the name and UUID of the follower index, the name of the cluster containing the follower index, and the alias from the perspective of that cluster for the remote cluster containing the leader index",
|
||||
"required" : true
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user