Fix LocalIndexFollowingIT#testRemoveRemoteConnection() test (#38709)

* During fetching remote mapping if remote client is missing then
`NoSuchRemoteClusterException` was not handled.
* When adding remote connection, check that it is really connected
before continue-ing to run the tests.

Relates to #38695
This commit is contained in:
Martijn van Groningen 2019-02-14 10:19:26 +01:00
parent 03b2ec6ee6
commit ed08bc3537
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
5 changed files with 58 additions and 6 deletions

View File

@ -43,6 +43,10 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte
this.infos = Collections.unmodifiableList(new ArrayList<>(infos));
}
public List<RemoteConnectionInfo> getInfos() {
return infos;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);

View File

@ -92,6 +92,30 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable
skipUnavailable = input.readBoolean();
}
public List<String> getSeedNodes() {
return seedNodes;
}
public int getConnectionsPerCluster() {
return connectionsPerCluster;
}
public TimeValue getInitialConnectionTimeout() {
return initialConnectionTimeout;
}
public int getNumNodesConnected() {
return numNodesConnected;
}
public String getClusterAlias() {
return clusterAlias;
}
public boolean isSkipUnavailable() {
return skipUnavailable;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {

View File

@ -46,6 +46,7 @@ import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
@ -114,7 +115,16 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
final Index followerIndex = params.getFollowShardId().getIndex();
final Index leaderIndex = params.getLeaderShardId().getIndex();
final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
final Client remoteClient;
try {
remoteClient = remoteClient(params);
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
return;
}
CcrRequests.getIndexMetadata(remoteClient, leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
indexMetaData -> {
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
@ -173,7 +183,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
};
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
}
@ -231,7 +241,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
request.setPollTimeout(params.getReadPollTimeout());
try {
remoteClient(params).execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
} catch (Exception e) {
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
}

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.service.ClusterService;
@ -15,6 +17,7 @@ import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.LicensesMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
@ -30,6 +33,7 @@ import org.junit.Before;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.CcrIntegTestCase.removeCCRRelatedMetadataFromClusterState;
@ -57,11 +61,17 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
}
@Before
public void setupLocalRemote() {
public void setupLocalRemote() throws Exception {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = getInstanceFromNode(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", address));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
assertBusy(() -> {
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(1));
assertThat(infos.get(0).getNumNodesConnected(), equalTo(1));
});
}
@Before
@ -76,10 +86,15 @@ public abstract class CcrSingleNodeTestCase extends ESSingleNodeTestCase {
}
@After
public void removeLocalRemote() {
public void removeLocalRemote() throws Exception {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", (String) null));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
assertBusy(() -> {
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(0));
});
}
protected AutoFollowStats getAutoFollowStats() {

View File

@ -92,7 +92,6 @@ public class LocalIndexFollowingIT extends CcrSingleNodeTestCase {
assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38695")
public void testRemoveRemoteConnection() throws Exception {
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setName("my_pattern");