Fail with a dedicated exception if remote connection is missing or (#37767)
or connectivity to the remote connection is failing. Relates to #37681
This commit is contained in:
parent
df8fa9781e
commit
1151f3b3ff
|
@ -1010,7 +1010,9 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
|
|||
COORDINATION_STATE_REJECTED_EXCEPTION(org.elasticsearch.cluster.coordination.CoordinationStateRejectedException.class,
|
||||
org.elasticsearch.cluster.coordination.CoordinationStateRejectedException::new, 150, Version.V_7_0_0),
|
||||
SNAPSHOT_IN_PROGRESS_EXCEPTION(org.elasticsearch.snapshots.SnapshotInProgressException.class,
|
||||
org.elasticsearch.snapshots.SnapshotInProgressException::new, 151, Version.V_7_0_0);
|
||||
org.elasticsearch.snapshots.SnapshotInProgressException::new, 151, Version.V_7_0_0),
|
||||
NO_SUCH_REMOTE_CLUSTER_EXCEPTION(org.elasticsearch.transport.NoSuchRemoteClusterException.class,
|
||||
org.elasticsearch.transport.NoSuchRemoteClusterException::new, 152, Version.V_7_0_0);
|
||||
|
||||
final Class<? extends ElasticsearchException> exceptionClass;
|
||||
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.ResourceNotFoundException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An exception that remote cluster is missing or
|
||||
* connectivity to the remote connection is failing
|
||||
*/
|
||||
public final class NoSuchRemoteClusterException extends ResourceNotFoundException {
|
||||
|
||||
NoSuchRemoteClusterException(String clusterName) {
|
||||
//No node available for cluster
|
||||
super("no such remote cluster: [" + clusterName + "]");
|
||||
}
|
||||
|
||||
public NoSuchRemoteClusterException(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
|
||||
}
|
|
@ -659,7 +659,7 @@ final class RemoteClusterConnection implements TransportConnectionListener, Clos
|
|||
if (currentIterator.hasNext()) {
|
||||
return currentIterator.next();
|
||||
} else {
|
||||
throw new IllegalStateException("No node available for cluster: " + clusterAlias);
|
||||
throw new NoSuchRemoteClusterException(clusterAlias);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -337,7 +337,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
RemoteClusterConnection getRemoteClusterConnection(String cluster) {
|
||||
RemoteClusterConnection connection = remoteClusters.get(cluster);
|
||||
if (connection == null) {
|
||||
throw new IllegalArgumentException("no such remote cluster: " + cluster);
|
||||
throw new NoSuchRemoteClusterException(cluster);
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
@ -415,7 +415,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
|
||||
for (String cluster : clusters) {
|
||||
if (remoteClusters.containsKey(cluster) == false) {
|
||||
listener.onFailure(new IllegalArgumentException("no such remote cluster: [" + cluster + "]"));
|
||||
listener.onFailure(new NoSuchRemoteClusterException(cluster));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -456,7 +456,7 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
|
|||
*/
|
||||
public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) {
|
||||
if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) {
|
||||
throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]");
|
||||
throw new NoSuchRemoteClusterException(clusterAlias);
|
||||
}
|
||||
return new RemoteClusterAwareClient(settings, threadPool, transportService, clusterAlias);
|
||||
}
|
||||
|
|
|
@ -85,6 +85,7 @@ import org.elasticsearch.test.VersionUtils;
|
|||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.ActionTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.transport.TcpTransport;
|
||||
|
||||
import java.io.EOFException;
|
||||
|
@ -809,6 +810,7 @@ public class ExceptionSerializationTests extends ESTestCase {
|
|||
ids.put(149, MultiBucketConsumerService.TooManyBucketsException.class);
|
||||
ids.put(150, CoordinationStateRejectedException.class);
|
||||
ids.put(151, SnapshotInProgressException.class);
|
||||
ids.put(152, NoSuchRemoteClusterException.class);
|
||||
|
||||
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
|
||||
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
|
||||
|
|
|
@ -1000,10 +1000,8 @@ public class RemoteClusterConnectionTests extends ESTestCase {
|
|||
try {
|
||||
DiscoveryNode node = connection.getAnyConnectedNode();
|
||||
assertNotNull(node);
|
||||
} catch (IllegalStateException e) {
|
||||
if (e.getMessage().startsWith("No node available for cluster:") == false) {
|
||||
throw e;
|
||||
}
|
||||
} catch (NoSuchRemoteClusterException e) {
|
||||
// ignore, this is an expected exception
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
|
|
|
@ -657,7 +657,7 @@ public class RemoteClusterServiceTests extends ESTestCase {
|
|||
});
|
||||
failLatch.await();
|
||||
assertNotNull(ex.get());
|
||||
assertTrue(ex.get() instanceof IllegalArgumentException);
|
||||
assertTrue(ex.get() instanceof NoSuchRemoteClusterException);
|
||||
assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage());
|
||||
}
|
||||
{
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.util.concurrent.CountDown;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.license.LicenseUtils;
|
||||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.xpack.ccr.Ccr;
|
||||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
|
||||
import org.elasticsearch.xpack.ccr.CcrSettings;
|
||||
|
@ -374,9 +375,7 @@ public class AutoFollowCoordinator implements ClusterStateListener {
|
|||
autoFollowIndices(autoFollowMetadata, clusterState, remoteClusterState, patterns);
|
||||
} else {
|
||||
assert remoteError != null;
|
||||
String expectedErrorMessage = "unknown cluster alias [" + remoteCluster + "]";
|
||||
if (remoteError instanceof IllegalArgumentException &&
|
||||
expectedErrorMessage.equals(remoteError.getMessage())) {
|
||||
if (remoteError instanceof NoSuchRemoteClusterException) {
|
||||
LOGGER.info("AutoFollower for cluster [{}] has stopped, because remote connection is gone", remoteCluster);
|
||||
remoteClusterConnectionMissing = true;
|
||||
return;
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.node.NodeClosedException;
|
|||
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
|
||||
|
@ -451,10 +452,6 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
return true;
|
||||
}
|
||||
|
||||
// This is thrown when using a Client and its remote cluster alias went MIA
|
||||
String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster;
|
||||
// This is thrown when creating a Client and the remote cluster does not exist:
|
||||
String unknownClusterMessage = "unknown cluster alias [" + remoteCluster + "]";
|
||||
final Throwable actual = ExceptionsHelper.unwrapCause(e);
|
||||
return actual instanceof ShardNotFoundException ||
|
||||
actual instanceof IllegalIndexShardStateException ||
|
||||
|
@ -466,9 +463,8 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
|
|||
actual instanceof IndexClosedException || // If follow index is closed
|
||||
actual instanceof ConnectTransportException ||
|
||||
actual instanceof NodeClosedException ||
|
||||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) ||
|
||||
(actual instanceof IllegalArgumentException && (noSuchRemoteClusterMessage.equals(actual.getMessage()) ||
|
||||
unknownClusterMessage.equals(actual.getMessage())));
|
||||
actual instanceof NoSuchRemoteClusterException ||
|
||||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed"));
|
||||
}
|
||||
|
||||
// These methods are protected for testing purposes:
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
import org.elasticsearch.transport.NoSuchRemoteClusterException;
|
||||
import org.elasticsearch.xpack.CcrIntegTestCase;
|
||||
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
|
||||
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
|
||||
|
@ -574,16 +575,16 @@ public class IndexFollowingIT extends CcrIntegTestCase {
|
|||
ensureLeaderGreen("index1");
|
||||
PutFollowAction.Request followRequest = putFollow("index1", "index2");
|
||||
followRequest.setRemoteCluster("another_cluster");
|
||||
Exception e = expectThrows(IllegalArgumentException.class,
|
||||
Exception e = expectThrows(NoSuchRemoteClusterException.class,
|
||||
() -> followerClient().execute(PutFollowAction.INSTANCE, followRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
|
||||
assertThat(e.getMessage(), equalTo("no such remote cluster: [another_cluster]"));
|
||||
PutAutoFollowPatternAction.Request putAutoFollowRequest = new PutAutoFollowPatternAction.Request();
|
||||
putAutoFollowRequest.setName("name");
|
||||
putAutoFollowRequest.setRemoteCluster("another_cluster");
|
||||
putAutoFollowRequest.setLeaderIndexPatterns(Collections.singletonList("logs-*"));
|
||||
e = expectThrows(IllegalArgumentException.class,
|
||||
e = expectThrows(NoSuchRemoteClusterException.class,
|
||||
() -> followerClient().execute(PutAutoFollowPatternAction.INSTANCE, putAutoFollowRequest).actionGet());
|
||||
assertThat(e.getMessage(), equalTo("unknown cluster alias [another_cluster]"));
|
||||
assertThat(e.getMessage(), equalTo("no such remote cluster: [another_cluster]"));
|
||||
}
|
||||
|
||||
public void testLeaderIndexRed() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue