diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 13b12d4b96f..b1d6467168c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -29,8 +29,7 @@ import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.transport.NodeDisconnectedException; -import org.elasticsearch.transport.NodeNotConnectedException; +import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; @@ -448,7 +447,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { return true; } + // This is thrown when using a Client and its remote cluster alias went MIA String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster; + // This is thrown when creating a Client and the remote cluster does not exist: + String unknownClusterMessage = "unknown cluster alias [" + remoteCluster + "]"; final Throwable actual = ExceptionsHelper.unwrapCause(e); return actual instanceof ShardNotFoundException || actual instanceof IllegalIndexShardStateException || @@ -458,11 +460,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges actual instanceof ClusterBlockException || // If leader index is closed or no elected master actual instanceof IndexClosedException || // If follow index is closed - actual instanceof NodeDisconnectedException || - actual instanceof NodeNotConnectedException || + actual instanceof ConnectTransportException || actual instanceof NodeClosedException || (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) || - (actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage())); + (actual instanceof IllegalArgumentException && (noSuchRemoteClusterMessage.equals(actual.getMessage()) || + unknownClusterMessage.equals(actual.getMessage()))); } // These methods are protected for testing purposes: diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 14ec147a536..97308126ffb 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -94,12 +94,6 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor taskInProgress, Map headers) { ShardFollowTask params = taskInProgress.getParams(); - final Client remoteClient; - if (params.getRemoteCluster() != null) { - remoteClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders()); - } else { - remoteClient = wrapClient(client, params.getHeaders()); - } Client followerClient = wrapClient(client, params.getHeaders()); BiConsumer scheduler = (delay, command) -> { try { @@ -123,8 +117,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + CheckedConsumer onResponse = clusterStateResponse -> { IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); if (indexMetaData.getMappings().isEmpty()) { assert indexMetaData.getMappingVersion() == 1; @@ -140,7 +133,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler.accept(indexMetaData.getMappingVersion()), errorHandler)); - }, errorHandler)); + }; + try { + remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); + } catch (Exception e) { + errorHandler.accept(e); + } } @Override @@ -181,7 +179,11 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { + assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, + equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs)); + }); + } + + private void setupRemoteCluster() { + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + String address = getLeaderCluster().getMasterNodeInstance(TransportService.class).boundAddress().publishAddress().toString(); + updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address)); + assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); } }