[CCR] Adjust list retryable errors (#33985)

The following changes were made:
* Added ElasticsearchSecurityException. For in the case the current user has insufficient privileges while an index is being followed. Prior to following ccr checks whether the current user has sufficient privileges and if not the follow api fails with an error.
* Added Index block exception. If the leader index gets closed, this exception is returned.
* Added ClusterBlockException service unavailable. In case for example the leader cluster is without elected master.
* Removed IndexNotFoundException. If the leader / follower index has been deleted, ccr will need to stop the shard follow tasks with an error.

Closes #33954
This commit is contained in:
Martijn van Groningen 2018-09-28 13:33:09 +02:00 committed by GitHub
parent e2f310b56c
commit eb00348b57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 143 additions and 7 deletions

View File

@ -6,19 +6,26 @@
package org.elasticsearch.xpack.ccr.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
@ -48,7 +55,7 @@ import java.util.stream.Collectors;
public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private static final int DELAY_MILLIS = 50;
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);
private static final Logger LOGGER = LogManager.getLogger(ShardFollowNodeTask.class);
private final String leaderIndex;
private final ShardFollowTask params;
@ -377,9 +384,21 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
}
static boolean shouldRetry(Exception e) {
return NetworkExceptionHelper.isConnectException(e) ||
NetworkExceptionHelper.isCloseConnectionException(e) ||
TransportActions.isShardNotAvailableException(e);
if (NetworkExceptionHelper.isConnectException(e)) {
return true;
} else if (NetworkExceptionHelper.isCloseConnectionException(e)) {
return true;
}
final Throwable actual = ExceptionsHelper.unwrapCause(e);
return actual instanceof ShardNotFoundException ||
actual instanceof IllegalIndexShardStateException ||
actual instanceof NoShardAvailableActionException ||
actual instanceof UnavailableShardsException ||
actual instanceof AlreadyClosedException ||
actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges
actual instanceof ClusterBlockException || // If leader index is closed or no elected master
actual instanceof IndexClosedException; // If follow index is closed
}
// These methods are protected for testing purposes:

View File

@ -7,10 +7,13 @@
package org.elasticsearch.xpack.ccr;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.bulk.BulkProcessor;
@ -51,6 +54,9 @@ import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsRequest;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction.StatsResponses;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
@ -72,7 +78,10 @@ import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -542,6 +551,110 @@ public class ShardChangesIT extends ESIntegTestCase {
"this setting is managed via a dedicated API"));
}
public void testCloseLeaderIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().close(new CloseIndexRequest("index1")).actionGet();
assertBusy(() -> {
StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedFetches(), greaterThanOrEqualTo(1L));
assertThat(response.getStatsResponses().get(0).status().fetchExceptions().size(), equalTo(1));
ElasticsearchException exception = response.getStatsResponses().get(0).status()
.fetchExceptions().entrySet().iterator().next().getValue().v2();
assertThat(exception.getMessage(), equalTo("blocked by: [FORBIDDEN/4/index closed];"));
});
client().admin().indices().open(new OpenIndexRequest("index1")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));
unfollowIndex("index2");
}
public void testCloseFollowIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
StatsResponses response = client().execute(CcrStatsAction.INSTANCE, new StatsRequest()).actionGet();
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getStatsResponses(), hasSize(1));
assertThat(response.getStatsResponses().get(0).status().numberOfFailedBulkOperations(), greaterThanOrEqualTo(1L));
});
client().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(2L)));
unfollowIndex("index2");
}
public void testDeleteLeaderIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().delete(new DeleteIndexRequest("index1")).actionGet();
ensureNoCcrTasks();
}
public void testDeleteFollowerIndex() throws Exception {
assertAcked(client().admin().indices().prepareCreate("index1")
.setSettings(Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.build()));
final ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
final PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
client().prepareIndex("index1", "doc", "1").setSource("{}", XContentType.JSON).get();
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(1L)));
client().admin().indices().delete(new DeleteIndexRequest("index2")).actionGet();
client().prepareIndex("index1", "doc", "2").setSource("{}", XContentType.JSON).get();
ensureNoCcrTasks();
}
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
@ -584,10 +697,14 @@ public class ShardChangesIT extends ESIntegTestCase {
unfollowRequest.setFollowIndex(index);
client().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
}
ensureNoCcrTasks();
}
private void ensureNoCcrTasks() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0));
assertThat(tasks.tasks(), empty());
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);