[CCR] Make shard follow tasks more resilient for restarts (#37239)

If a running shard follow task needs to be restarted and
the remote connection seeds have changed then
a shard follow task currently fails with a fatal error.

The change creates the remote client lazily and adjusts
the errors a shard follow task should retry.

This issue was found in test failures in the recently added
ccr rolling upgrade tests. The reason why this issue occurs
more frequently in the rolling upgrade test is because ccr
is setup in local mode (so remote connection seed will become stale) and
all nodes are restarted, which forces the shard follow tasks to get
restarted at some point during the test. Note that these tests
cannot be enabled yet, because this change will need to be backported
to 6.x first. (otherwise the issue still occurs on non upgraded nodes)

I also changed the RestartIndexFollowingIT to setup remote cluster
via persistent settings and to also restart the leader cluster. This
way what happens during the ccr rolling upgrade qa tests, also happens
in this test.

Relates to #37231
This commit is contained in:
Martijn van Groningen 2019-01-10 15:02:30 +01:00 committed by GitHub
parent 26cb7466ef
commit df488720e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 18 deletions

View File

@ -29,8 +29,7 @@ import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.NodeDisconnectedException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.NodeNotConnectedException;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
@ -448,7 +447,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
return true; return true;
} }
// This is thrown when using a Client and its remote cluster alias went MIA
String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster; String noSuchRemoteClusterMessage = "no such remote cluster: " + remoteCluster;
// This is thrown when creating a Client and the remote cluster does not exist:
String unknownClusterMessage = "unknown cluster alias [" + remoteCluster + "]";
final Throwable actual = ExceptionsHelper.unwrapCause(e); final Throwable actual = ExceptionsHelper.unwrapCause(e);
return actual instanceof ShardNotFoundException || return actual instanceof ShardNotFoundException ||
actual instanceof IllegalIndexShardStateException || actual instanceof IllegalIndexShardStateException ||
@ -458,11 +460,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges actual instanceof ElasticsearchSecurityException || // If user does not have sufficient privileges
actual instanceof ClusterBlockException || // If leader index is closed or no elected master actual instanceof ClusterBlockException || // If leader index is closed or no elected master
actual instanceof IndexClosedException || // If follow index is closed actual instanceof IndexClosedException || // If follow index is closed
actual instanceof NodeDisconnectedException || actual instanceof ConnectTransportException ||
actual instanceof NodeNotConnectedException ||
actual instanceof NodeClosedException || actual instanceof NodeClosedException ||
(actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) || (actual.getMessage() != null && actual.getMessage().contains("TransportService is closed")) ||
(actual instanceof IllegalArgumentException && noSuchRemoteClusterMessage.equals(actual.getMessage())); (actual instanceof IllegalArgumentException && (noSuchRemoteClusterMessage.equals(actual.getMessage()) ||
unknownClusterMessage.equals(actual.getMessage())));
} }
// These methods are protected for testing purposes: // These methods are protected for testing purposes:

View File

@ -94,12 +94,6 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress, PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> taskInProgress,
Map<String, String> headers) { Map<String, String> headers) {
ShardFollowTask params = taskInProgress.getParams(); ShardFollowTask params = taskInProgress.getParams();
final Client remoteClient;
if (params.getRemoteCluster() != null) {
remoteClient = wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
} else {
remoteClient = wrapClient(client, params.getHeaders());
}
Client followerClient = wrapClient(client, params.getHeaders()); Client followerClient = wrapClient(client, params.getHeaders());
BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> { BiConsumer<TimeValue, Runnable> scheduler = (delay, command) -> {
try { try {
@ -123,8 +117,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Index followIndex = params.getFollowShardId().getIndex(); Index followIndex = params.getFollowShardId().getIndex();
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName()); ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex); IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
if (indexMetaData.getMappings().isEmpty()) { if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1; assert indexMetaData.getMappingVersion() == 1;
@ -140,7 +133,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler)); errorHandler));
}, errorHandler)); };
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
} }
@Override @Override
@ -181,7 +179,11 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
} }
} }
}; };
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler)); try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
} }
private void closeIndexUpdateSettingsAndOpenIndex(String followIndex, private void closeIndexUpdateSettingsAndOpenIndex(String followIndex,
@ -236,7 +238,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
request.setMaxBatchSize(params.getMaxReadRequestSize()); request.setMaxBatchSize(params.getMaxReadRequestSize());
request.setPollTimeout(params.getReadPollTimeout()); request.setPollTimeout(params.getReadPollTimeout());
try { try {
remoteClient.execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler)); remoteClient(params).execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
} catch (Exception e) { } catch (Exception e) {
errorHandler.accept(e); errorHandler.accept(e);
} }
@ -251,6 +253,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()]; return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()];
} }
private Client remoteClient(ShardFollowTask params) {
return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
}
interface FollowerStatsInfoHandler { interface FollowerStatsInfoHandler {
void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo); void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo);
} }

View File

@ -202,7 +202,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests: // Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100)); builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
if (leaderSeedAddress != null) { if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress); builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
} }
return new NodeConfigurationSource() { return new NodeConfigurationSource() {
@ -247,6 +247,10 @@ public abstract class CcrIntegTestCase extends ESTestCase {
return true; return true;
} }
protected boolean configureRemoteClusterViaNodeSettings() {
return true;
}
protected final Client leaderClient() { protected final Client leaderClient() {
return clusterGroup.leaderCluster.client(); return clusterGroup.leaderCluster.client();
} }

View File

@ -6,8 +6,11 @@
package org.elasticsearch.xpack.ccr; package org.elasticsearch.xpack.ccr;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
@ -24,12 +27,17 @@ public class RestartIndexFollowingIT extends CcrIntegTestCase {
return 1; return 1;
} }
@Override
protected boolean configureRemoteClusterViaNodeSettings() {
return false;
}
public void testFollowIndex() throws Exception { public void testFollowIndex() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, 0, final String leaderIndexSettings = getIndexSettings(1, 0,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true");
assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); assertAcked(leaderClient().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen("index1"); ensureLeaderGreen("index1");
setupRemoteCluster();
final PutFollowAction.Request followRequest = putFollow("index1", "index2"); final PutFollowAction.Request followRequest = putFollow("index1", "index2");
followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); followerClient().execute(PutFollowAction.INSTANCE, followRequest).get();
@ -57,6 +65,28 @@ public class RestartIndexFollowingIT extends CcrIntegTestCase {
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value, assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
equalTo(firstBatchNumDocs + secondBatchNumDocs)); equalTo(firstBatchNumDocs + secondBatchNumDocs));
}); });
getLeaderCluster().fullRestart();
ensureLeaderGreen("index1");
// Remote connection needs to be re-configured, because all the nodes in leader cluster have been restarted:
setupRemoteCluster();
final long thirdBatchNumDocs = randomIntBetween(2, 64);
for (int i = 0; i < thirdBatchNumDocs; i++) {
leaderClient().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
}
assertBusy(() -> {
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits().value,
equalTo(firstBatchNumDocs + secondBatchNumDocs + thirdBatchNumDocs));
});
}
private void setupRemoteCluster() {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = getLeaderCluster().getMasterNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
} }
} }