Retry CCR shard follow task when no seed node left (#63225)
If the connection between clusters is disconnected or the leader cluster is offline, then CCR shard-follow tasks can stop with "no seed node left". CCR should retry on this error.
This commit is contained in:
parent
5c3a4c13dd
commit
25fbc01459
|
@ -1042,7 +1042,12 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException.class,
|
||||
org.elasticsearch.cluster.coordination.NodeHealthCheckFailureException::new,
|
||||
159,
|
||||
Version.V_7_9_0);
|
||||
Version.V_7_9_0),
|
||||
NO_SEED_NODE_LEFT_EXCEPTION(
|
||||
org.elasticsearch.transport.NoSeedNodeLeftException.class,
|
||||
org.elasticsearch.transport.NoSeedNodeLeftException::new,
|
||||
160,
|
||||
Version.V_7_10_0);
|
||||
|
||||
final Class<? extends ElasticsearchException> exceptionClass;
|
||||
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Thrown after failed to connect to all seed nodes of the remote cluster.
|
||||
*/
|
||||
public class NoSeedNodeLeftException extends ElasticsearchException {
|
||||
|
||||
public NoSeedNodeLeftException(String clusterName) {
|
||||
super("no seed node left for cluster: [" + clusterName + "]");
|
||||
}
|
||||
|
||||
public NoSeedNodeLeftException(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
}
|
|
@ -373,7 +373,7 @@ public class SniffConnectionStrategy extends RemoteConnectionStrategy {
|
|||
onFailure.accept(e);
|
||||
});
|
||||
} else {
|
||||
listener.onFailure(new IllegalStateException("no seed node left"));
|
||||
listener.onFailure(new NoSeedNodeLeftException(clusterAlias));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -91,6 +91,7 @@ import org.elasticsearch.test.VersionUtils;
|
|||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.ActionTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.NoSeedNodeLeftException;
|
||||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
||||
|
@ -830,6 +831,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(157, IngestProcessorException.class);
|
||||
ids.put(158, PeerRecoveryNotFound.class);
|
||||
ids.put(159, NodeHealthCheckFailureException.class);
|
||||
ids.put(160, NoSeedNodeLeftException.class);
|
||||
|
||||
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
|
||||
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
|
||||
|
|
|
@ -661,12 +661,13 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
|||
failLatch.await();
|
||||
assertNotNull(ex.get());
|
||||
if (ex.get() instanceof IllegalStateException) {
|
||||
assertThat(ex.get().getMessage(), either(equalTo("no seed node left"))
|
||||
.or(equalTo("Unable to open any connections to remote cluster [cluster_1]"))
|
||||
.or(equalTo("Unable to open any connections to remote cluster [cluster_2]")));
|
||||
assertThat(ex.get().getMessage(),
|
||||
either(equalTo("Unable to open any connections to remote cluster [cluster_1]"))
|
||||
.or(equalTo("Unable to open any connections to remote cluster [cluster_2]")));
|
||||
} else {
|
||||
assertThat(ex.get(),
|
||||
either(instanceOf(TransportException.class)).or(instanceOf(NoSuchRemoteClusterException.class)));
|
||||
assertThat(ex.get(), either(instanceOf(TransportException.class))
|
||||
.or(instanceOf(NoSuchRemoteClusterException.class))
|
||||
.or(instanceOf(NoSeedNodeLeftException.class)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.persistent.AllocatedPersistentTask;
|
|||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.NoSeedNodeLeftException;
|
||||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
|
||||
|
@ -568,6 +569,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
actual instanceof ConnectTransportException ||
|
||||
actual instanceof NodeClosedException ||
|
||||
actual instanceof NoSuchRemoteClusterException ||
|
||||
actual instanceof NoSeedNodeLeftException ||
|
||||
actual instanceof EsRejectedExecutionException ||
|
||||
actual instanceof CircuitBreakingException;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.NoSeedNodeLeftException;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
|
||||
|
@ -316,6 +317,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase {
|
|||
if (sometimes()) {
|
||||
Exception error = randomFrom(
|
||||
new UnavailableShardsException(new ShardId("test", "test", 0), ""),
|
||||
new NoSeedNodeLeftException("cluster_a"),
|
||||
new CircuitBreakingException("test", randomInt(), randomInt(), randomFrom(CircuitBreaker.Durability.values())),
|
||||
new EsRejectedExecutionException("test"));
|
||||
item.add(new TestResponse(error, mappingVersion, settingsVersion, null));
|
||||
|
|
Loading…
Reference in New Issue