Make CCR resilient against missing remote cluster connections (#36682)

Both index following and auto following should be resilient against missing remote connections.
This happens in the case that they get accidentally removed by a user. When this happens
auto following and index following will retry to continue instead of failing with unrecoverable exceptions.

Both the put follow and put auto follow APIs validate whether the
remote cluster connection. The logic added in this change only exists
in case during the lifetime of a follower index or auto follow pattern
the remote connection gets removed. This retry behavior similar how CCR
deals with authorization errors.

Closes #36667
Closes #36255
This commit is contained in:
Martijn van Groningen 2018-12-24 07:28:34 +01:00 committed by GitHub
parent 1e6eff6148
commit 4fb62fcba6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 10 deletions

View File

@ -245,11 +245,17 @@ public class AutoFollowCoordinator implements ClusterStateListener {
} }
List<String> removedRemoteClusters = new ArrayList<>(); List<String> removedRemoteClusters = new ArrayList<>();
for (String remoteCluster : autoFollowers.keySet()) { for (Map.Entry<String, AutoFollower> entry : autoFollowers.entrySet()) {
String remoteCluster = entry.getKey();
AutoFollower autoFollower = entry.getValue();
boolean exist = autoFollowMetadata.getPatterns().values().stream() boolean exist = autoFollowMetadata.getPatterns().values().stream()
.anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster));
if (exist == false) { if (exist == false) {
removedRemoteClusters.add(remoteCluster); removedRemoteClusters.add(remoteCluster);
} else if (autoFollower.remoteClusterConnectionMissing) {
LOGGER.info("Retrying auto follower [{}] after remote cluster connection was missing", remoteCluster);
autoFollower.remoteClusterConnectionMissing = false;
autoFollower.start();
} }
} }
this.autoFollowers = autoFollowers this.autoFollowers = autoFollowers
@ -281,6 +287,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
private volatile long lastAutoFollowTimeInMillis = -1; private volatile long lastAutoFollowTimeInMillis = -1;
private volatile long metadataVersion = 0; private volatile long metadataVersion = 0;
private volatile boolean remoteClusterConnectionMissing = false;
private volatile CountDown autoFollowPatternsCountDown; private volatile CountDown autoFollowPatternsCountDown;
private volatile AtomicArray<AutoFollowResult> autoFollowResults; private volatile AtomicArray<AutoFollowResult> autoFollowResults;
@ -327,6 +334,14 @@ public class AutoFollowCoordinator implements ClusterStateListener {
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns); autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
} else { } else {
assert remoteError != null; assert remoteError != null;
String expectedErrorMessage = "unknown cluster alias [" + remoteCluster + "]";
if (remoteError instanceof IllegalArgumentException &&
expectedErrorMessage.equals(remoteError.getMessage())) {
LOGGER.info("AutoFollower for cluster [{}] has stopped, because remote connection is gone", remoteCluster);
remoteClusterConnectionMissing = true;
return;
}
for (int i = 0; i < patterns.size(); i++) { for (int i = 0; i < patterns.size(); i++) {
String autoFollowPatternName = patterns.get(i); String autoFollowPatternName = patterns.get(i);
finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError)); finalise(i, new AutoFollowResult(autoFollowPatternName, remoteError));

View File

@ -419,7 +419,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) { private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
assert e != null; assert e != null;
if (shouldRetry(e) && isStopped() == false) { if (shouldRetry(params.getRemoteCluster(), e) && isStopped() == false) {
int currentRetry = retryCounter.incrementAndGet(); int currentRetry = retryCounter.incrementAndGet();
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]", LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
params.getFollowShardId(), currentRetry), e); params.getFollowShardId(), currentRetry), e);
@ -441,13 +441,14 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
return Math.min(backOffDelay, maxRetryDelayInMillis); return Math.min(backOffDelay, maxRetryDelayInMillis);
} }
static boolean shouldRetry(Exception e) { static boolean shouldRetry(String remoteCluster, Exception e) {
if (NetworkExceptionHelper.isConnectException(e)) { if (NetworkExceptionHelper.isConnectException(e)) {
return true; return true;
} else if (NetworkExceptionHelper.isCloseConnectionException(e)) { } else if (NetworkExceptionHelper.isCloseConnectionException(e)) {
return true; return true;
} }
String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster;
final Throwable actual = ExceptionsHelper.unwrapCause(e); final Throwable actual = ExceptionsHelper.unwrapCause(e);
return actual instanceof ShardNotFoundException || return actual instanceof ShardNotFoundException ||
actual instanceof IllegalIndexShardStateException || actual instanceof IllegalIndexShardStateException ||
@ -460,7 +461,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
actual instanceof NodeDisconnectedException || actual instanceof NodeDisconnectedException ||
actual instanceof NodeNotConnectedException || actual instanceof NodeNotConnectedException ||
actual instanceof NodeClosedException || actual instanceof NodeClosedException ||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")); (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) ||
(actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage()));
} }
// These methods are protected for testing purposes: // These methods are protected for testing purposes:

View File

@ -235,7 +235,11 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
request.setMaxOperationCount(maxOperationCount); request.setMaxOperationCount(maxOperationCount);
request.setMaxBatchSize(params.getMaxReadRequestSize()); request.setMaxBatchSize(params.getMaxReadRequestSize());
request.setPollTimeout(params.getReadPollTimeout()); request.setPollTimeout(params.getReadPollTimeout());
try {
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
} }
}; };
} }
@ -265,7 +269,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
return; return;
} }
if (ShardFollowNodeTask.shouldRetry(e)) { if (ShardFollowNodeTask.shouldRetry(params.getRemoteCluster(), e)) {
logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number", logger.debug(new ParameterizedMessage("failed to fetch follow shard global {} checkpoint and max sequence number",
shardFollowNodeTask), e); shardFollowNodeTask), e);
threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state)); threadPool.schedule(params.getMaxRetryDelay(), Ccr.CCR_THREAD_POOL_NAME, () -> nodeOperation(task, params, state));

View File

@ -6,25 +6,31 @@
package org.elasticsearch.xpack.ccr; package org.elasticsearch.xpack.ccr;
import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.xpack.CcrSingleNodeTestCase; import org.elasticsearch.xpack.CcrSingleNodeTestCase;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764")
public class LocalIndexFollowingIT extends CcrSingleNodeTestCase { public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
public void testFollowIndex() throws Exception { public void testFollowIndex() throws Exception {
@ -86,6 +92,51 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false)); assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false));
} }
public void testRemoveRemoteConnection() throws Exception {
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setName("my_pattern");
request.setRemoteCluster("local");
request.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
request.setFollowIndexNamePattern("copy-{{leader_index}}");
request.setReadPollTimeout(TimeValue.timeValueMillis(10));
assertTrue(client().execute(PutAutoFollowPatternAction.INSTANCE, request).actionGet().isAcknowledged());
Settings leaderIndexSettings = Settings.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build();
createIndex("logs-20200101", leaderIndexSettings);
client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet();
assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(1L));
assertThat(response.getFollowStats().getStatsResponses().size(), equalTo(1));
assertThat(response.getFollowStats().getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(0L));
});
// Both auto follow patterns and index following should be resilient to remote connection being missing:
removeLocalRemote();
// This triggers a cluster state update, which should let auto follow coordinator retry auto following:
setupLocalRemote();
// This new index should be picked up by auto follow coordinator
createIndex("logs-20200102", leaderIndexSettings);
// This new document should be replicated to follower index:
client().prepareIndex("logs-20200101", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
CcrStatsAction.Response response = client().execute(CcrStatsAction.INSTANCE, new CcrStatsAction.Request()).actionGet();
assertThat(response.getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), equalTo(2L));
FollowStatsAction.StatsRequest statsRequest = new FollowStatsAction.StatsRequest();
statsRequest.setIndices(new String[]{"copy-logs-20200101"});
FollowStatsAction.StatsResponses responses = client().execute(FollowStatsAction.INSTANCE, statsRequest).actionGet();
assertThat(responses.getStatsResponses().size(), equalTo(1));
assertThat(responses.getStatsResponses().get(0).status().getFatalException(), nullValue());
assertThat(responses.getStatsResponses().get(0).status().followerGlobalCheckpoint(), equalTo(1L));
});
}
public static String getIndexSettings(final int numberOfShards, public static String getIndexSettings(final int numberOfShards,
final int numberOfReplicas, final int numberOfReplicas,
final Map<String, String> additionalIndexSettings) throws IOException { final Map<String, String> additionalIndexSettings) throws IOException {

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ccr.action; package org.elasticsearch.xpack.ccr.action;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
@ -34,7 +33,6 @@ import static org.hamcrest.collection.IsEmptyCollection.empty;
* Test scope is important to ensure that other tests added to this suite do not interfere with the expectation in * Test scope is important to ensure that other tests added to this suite do not interfere with the expectation in
* testStatsWhenNoPersistentTasksMetaDataExists that the cluster state does not contain any persistent tasks metadata. * testStatsWhenNoPersistentTasksMetaDataExists that the cluster state does not contain any persistent tasks metadata.
*/ */
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/36764")
public class FollowStatsIT extends CcrSingleNodeTestCase { public class FollowStatsIT extends CcrSingleNodeTestCase {
/** /**