It is possible that the Unfollow API may fail to release shard history retention leases when unfollowing, so this needs to be handled by the ILM Unfollow action. There's nothing much that can be done automatically about it from the follower side, so this change makes the ILM unfollow action simply ignore those failures.
This commit is contained in:
parent
a9e86bc941
commit
f4c5abe4d4
|
@ -5,11 +5,17 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
final class UnfollowFollowIndexStep extends AbstractUnfollowIndexStep {
|
||||
private static final Logger logger = LogManager.getLogger(UnfollowFollowIndexStep.class);
|
||||
|
||||
static final String NAME = "unfollow-follower-index";
|
||||
|
||||
|
@ -25,7 +31,19 @@ final class UnfollowFollowIndexStep extends AbstractUnfollowIndexStep {
|
|||
assert r.isAcknowledged() : "unfollow response is not acknowledged";
|
||||
listener.onResponse(true);
|
||||
},
|
||||
listener::onFailure
|
||||
exception -> {
|
||||
if (exception instanceof ElasticsearchException
|
||||
&& ((ElasticsearchException) exception).getMetadata("es.failed_to_remove_retention_leases") != null) {
|
||||
List<String> leasesNotRemoved = ((ElasticsearchException) exception)
|
||||
.getMetadata("es.failed_to_remove_retention_leases");
|
||||
logger.debug("failed to remove leader retention lease(s) {} while unfollowing index [{}], " +
|
||||
"continuing with lifecycle execution",
|
||||
leasesNotRemoved, followerIndex);
|
||||
listener.onResponse(true);
|
||||
} else {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
}
|
||||
));
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
|
@ -16,6 +17,8 @@ import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction.CCR_METADATA_KEY;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -112,4 +115,47 @@ public class UnfollowFollowIndexStepTests extends AbstractUnfollowIndexStepTestC
|
|||
assertThat(completed[0], nullValue());
|
||||
assertThat(failure[0], sameInstance(error));
|
||||
}
|
||||
|
||||
public void testFailureToReleaseRetentionLeases() {
|
||||
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
|
||||
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
|
||||
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
|
||||
.numberOfShards(1)
|
||||
.numberOfReplicas(0)
|
||||
.build();
|
||||
|
||||
Client client = Mockito.mock(Client.class);
|
||||
AdminClient adminClient = Mockito.mock(AdminClient.class);
|
||||
Mockito.when(client.admin()).thenReturn(adminClient);
|
||||
IndicesAdminClient indicesClient = Mockito.mock(IndicesAdminClient.class);
|
||||
Mockito.when(adminClient.indices()).thenReturn(indicesClient);
|
||||
|
||||
// Mock unfollow api call:
|
||||
ElasticsearchException error = new ElasticsearchException("text exception");
|
||||
error.addMetadata("es.failed_to_remove_retention_leases", randomAlphaOfLength(10));
|
||||
Mockito.doAnswer(invocation -> {
|
||||
UnfollowAction.Request request = (UnfollowAction.Request) invocation.getArguments()[1];
|
||||
assertThat(request.getFollowerIndex(), equalTo("follower-index"));
|
||||
ActionListener listener = (ActionListener) invocation.getArguments()[2];
|
||||
listener.onFailure(error);
|
||||
return null;
|
||||
}).when(client).execute(Mockito.same(UnfollowAction.INSTANCE), Mockito.any(), Mockito.any());
|
||||
|
||||
AtomicBoolean completed = new AtomicBoolean(false);
|
||||
AtomicReference<Exception> failure = new AtomicReference<>();
|
||||
UnfollowFollowIndexStep step = new UnfollowFollowIndexStep(randomStepKey(), randomStepKey(), client);
|
||||
step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() {
|
||||
@Override
|
||||
public void onResponse(boolean complete) {
|
||||
completed.set(complete);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
failure.set(e);
|
||||
}
|
||||
});
|
||||
assertThat(completed.get(), equalTo(true));
|
||||
assertThat(failure.get(), nullValue());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@ followClusterTestCluster {
|
|||
followClusterTestRunner {
|
||||
systemProperty 'tests.target_cluster', 'follow'
|
||||
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
|
||||
systemProperty 'tests.leader_remote_cluster_seed', "${-> leaderClusterTest.nodes.get(0).transportUri()}"
|
||||
/* To support taking index snapshots, we have to set path.repo setting */
|
||||
systemProperty 'tests.path.repo', new File(buildDir, "cluster/shared/repo")
|
||||
finalizedBy 'leaderClusterTestCluster#stop'
|
||||
|
|
|
@ -16,6 +16,7 @@ import org.elasticsearch.client.RestClient;
|
|||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ObjectPath;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
@ -30,6 +31,7 @@ import org.elasticsearch.xpack.core.indexlifecycle.UnfollowAction;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -37,6 +39,7 @@ import static java.util.Collections.singletonMap;
|
|||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
||||
|
@ -403,6 +406,97 @@ public class CCRIndexLifecycleIT extends ESCCRRestTestCase {
|
|||
|
||||
}
|
||||
|
||||
public void testILMUnfollowFailsToRemoveRetentionLeases() throws Exception {
|
||||
final String leaderIndex = "leader";
|
||||
final String followerIndex = "follower";
|
||||
final String policyName = "unfollow_only_policy";
|
||||
|
||||
if ("leader".equals(targetCluster)) {
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put("index.soft_deletes.enabled", true)
|
||||
.put("index.number_of_shards", 1)
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.lifecycle.name", policyName) // this policy won't exist on the leader, that's fine
|
||||
.build();
|
||||
createIndex(leaderIndex, indexSettings, "", "");
|
||||
ensureGreen(leaderIndex);
|
||||
} else if ("follow".equals(targetCluster)) {
|
||||
try (RestClient leaderClient = buildLeaderClient()) {
|
||||
String leaderRemoteClusterSeed = System.getProperty("tests.leader_remote_cluster_seed");
|
||||
configureRemoteClusters("other_remote", leaderRemoteClusterSeed);
|
||||
assertBusy(() -> {
|
||||
Map<?, ?> localConnection = (Map<?, ?>) toMap(client()
|
||||
.performRequest(new Request("GET", "/_remote/info")))
|
||||
.get("other_remote");
|
||||
assertThat(localConnection, notNullValue());
|
||||
assertThat(localConnection.get("connected"), is(true));
|
||||
});
|
||||
putUnfollowOnlyPolicy(client(), policyName);
|
||||
// Set up the follower
|
||||
followIndex("other_remote", leaderIndex, followerIndex);
|
||||
ensureGreen(followerIndex);
|
||||
// Pause ILM so that this policy doesn't proceed until we want it to
|
||||
client().performRequest(new Request("POST", "/_ilm/stop"));
|
||||
|
||||
// Set indexing complete and wait for it to be replicated
|
||||
updateIndexSettings(leaderClient, leaderIndex, Settings.builder()
|
||||
.put("index.lifecycle.indexing_complete", true)
|
||||
.build()
|
||||
);
|
||||
assertBusy(() -> {
|
||||
assertThat(getIndexSetting(client(), followerIndex, "index.lifecycle.indexing_complete"), is("true"));
|
||||
});
|
||||
|
||||
// Remove remote cluster alias:
|
||||
configureRemoteClusters("other_remote", null);
|
||||
assertBusy(() -> {
|
||||
Map<?, ?> localConnection = (Map<?, ?>) toMap(client()
|
||||
.performRequest(new Request("GET", "/_remote/info")))
|
||||
.get("other_remote");
|
||||
assertThat(localConnection, nullValue());
|
||||
});
|
||||
// Then add it back with an incorrect seed node:
|
||||
// (unfollow api needs a remote cluster alias)
|
||||
configureRemoteClusters("other_remote", "localhost:9999");
|
||||
assertBusy(() -> {
|
||||
Map<?, ?> localConnection = (Map<?, ?>) toMap(client()
|
||||
.performRequest(new Request("GET", "/_remote/info")))
|
||||
.get("other_remote");
|
||||
assertThat(localConnection, notNullValue());
|
||||
assertThat(localConnection.get("connected"), is(false));
|
||||
|
||||
Request statsRequest = new Request("GET", "/" + followerIndex + "/_ccr/stats");
|
||||
Map<?, ?> response = toMap(client().performRequest(statsRequest));
|
||||
logger.info("follow shards response={}", response);
|
||||
String expectedIndex = ObjectPath.eval("indices.0.index", response);
|
||||
assertThat(expectedIndex, equalTo(followerIndex));
|
||||
Object fatalError = ObjectPath.eval("indices.0.shards.0.read_exceptions.0", response);
|
||||
assertThat(fatalError, notNullValue());
|
||||
});
|
||||
|
||||
// Start ILM back up and let it unfollow
|
||||
client().performRequest(new Request("POST", "/_ilm/start"));
|
||||
// Wait for the policy to be complete
|
||||
assertBusy(() -> {
|
||||
assertILMPolicy(client(), followerIndex, policyName, "completed", "completed", "completed");
|
||||
});
|
||||
|
||||
// Ensure the "follower" index has successfully unfollowed
|
||||
assertBusy(() -> {
|
||||
assertThat(getIndexSetting(client(), followerIndex, "index.xpack.ccr.following_index"), nullValue());
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void configureRemoteClusters(String name, String leaderRemoteClusterSeed) throws IOException {
|
||||
logger.info("Configuring leader remote cluster [{}]", leaderRemoteClusterSeed);
|
||||
Request request = new Request("PUT", "/_cluster/settings");
|
||||
request.setJsonEntity("{\"persistent\": {\"cluster.remote." + name + ".seeds\": " +
|
||||
(leaderRemoteClusterSeed != null ? String.format(Locale.ROOT, "\"%s\"", leaderRemoteClusterSeed) : null) + "}}");
|
||||
assertThat(client().performRequest(request).getStatusLine().getStatusCode(), equalTo(200));
|
||||
}
|
||||
|
||||
private static void putILMPolicy(String name, String maxSize, Integer maxDocs, TimeValue maxAge) throws IOException {
|
||||
final Request request = new Request("PUT", "_ilm/policy/" + name);
|
||||
XContentBuilder builder = jsonBuilder();
|
||||
|
|
Loading…
Reference in New Issue