Add the ability to the follow index to follow an index in a remote cluster.

The follow index api completely reuses CCS infrastructure that was exposed via:
https://github.com/elastic/elasticsearch/pull/29495

This means that the leader index parameter support the same ccs index
to indicate that an index resides in a different cluster.

I also added a qa module that smoke tests the cross cluster nature of ccr.
The idea is that this test just verifies that ccr can read data from a
remote leader index and that is it, no crazy randomization or indirectly
testing other features.
This commit is contained in:
Martijn van Groningen 2018-04-13 19:11:28 +02:00
parent c0d42e9cd1
commit 56ca59a513
9 changed files with 373 additions and 116 deletions

View File

@ -32,7 +32,11 @@ task internalClusterTest(type: RandomizedTestingTask,
include '**/*IT.class'
systemProperty 'es.set.netty.runtime.available.processors', 'false'
}
check.dependsOn internalClusterTest
check {
dependsOn = [internalClusterTest, 'qa:multi-cluster:followClusterTest']
}
internalClusterTest.mustRunAfter test
dependencies {

View File

@ -0,0 +1,12 @@
/* Remove assemble on all qa projects because we don't need to publish
* artifacts for them. */
gradle.projectsEvaluated {
subprojects {
Task assemble = project.tasks.findByName('assemble')
if (assemble) {
project.tasks.remove(assemble)
project.build.dependsOn.remove('assemble')
}
}
}

View File

@ -0,0 +1,40 @@
import org.elasticsearch.gradle.test.RestIntegTestTask
apply plugin: 'elasticsearch.standalone-test'
dependencies {
testCompile project(path: xpackModule('core'), configuration: 'runtime')
testCompile project(path: xpackModule('ccr'), configuration: 'runtime')
}
task leaderClusterTest(type: RestIntegTestTask) {
mustRunAfter(precommit)
}
leaderClusterTestCluster {
numNodes = 1
clusterName = 'leader-cluster'
plugin xpackProject('plugin').path
}
leaderClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'true'
}
task followClusterTest(type: RestIntegTestTask) {}
followClusterTestCluster {
dependsOn leaderClusterTestRunner
numNodes = 1
clusterName = 'follow-cluster'
plugin xpackProject('plugin').path
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
}
followClusterTestRunner {
systemProperty 'tests.is_leader_cluster', 'false'
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
finalizedBy 'leaderClusterTestCluster#stop'
}
test.enabled = false // no unit tests for multi-cluster-search, only the rest integration test

View File

@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr;
import org.apache.http.HttpHost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.test.rest.ESRestTestCase;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
public class FollowIndexIT extends ESRestTestCase {
private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster"));
@Override
protected boolean preserveClusterUponCompletion() {
return true;
}
public void testFollowIndex() throws Exception {
final int numDocs = 128;
final String leaderIndexName = "test_index1";
if (runningAgainstLeaderCluster) {
logger.info("Running against leader cluster");
for (int i = 0; i < numDocs; i++) {
logger.info("Indexing doc [{}]", i);
index(client(), leaderIndexName, Integer.toString(i), "field", i);
}
refresh(leaderIndexName);
verifyDocuments(leaderIndexName, numDocs);
} else {
logger.info("Running against follow cluster");
final String followIndexName = "test_index2";
Settings indexSettings = Settings.builder()
.put("index.xpack.ccr.following_index", true)
.build();
// TODO: remove mapping here when ccr syncs mappings too
createIndex(followIndexName, indexSettings, "\"doc\": { \"properties\": { \"field\": { \"type\": \"integer\" }}}");
ensureYellow(followIndexName);
followIndex("leader_cluster:" + leaderIndexName, followIndexName);
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id);
index(leaderClient, leaderIndexName, Integer.toString(id + 1), "field", id + 1);
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2);
}
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
}
}
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
XContentBuilder document = jsonBuilder().startObject();
for (int i = 0; i < fields.length; i += 2) {
document.field((String) fields[i], fields[i + 1]);
}
document.endObject();
assertOK(client.performRequest("POST", "/" + index + "/doc/" + id, emptyMap(),
new StringEntity(Strings.toString(document), ContentType.APPLICATION_JSON)));
}
private static void refresh(String index) throws IOException {
assertOK(client().performRequest("POST", "/" + index + "/_refresh"));
}
private static void followIndex(String leaderIndex, String followIndex) throws IOException {
Map<String, String> params = Collections.singletonMap("leader_index", leaderIndex);
assertOK(client().performRequest("POST", "/_xpack/ccr/" + followIndex + "/_follow", params));
}
private static void verifyDocuments(String index, int expectedNumDocs) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("size", Integer.toString(expectedNumDocs));
params.put("sort", "field:asc");
Map<String, ?> response = toMap(client().performRequest("GET", "/" + index + "/_search", params));
int numDocs = (int) XContentMapValues.extractValue("hits.total", response);
assertThat(numDocs, equalTo(expectedNumDocs));
List<?> hits = (List<?>) XContentMapValues.extractValue("hits.hits", response);
assertThat(hits.size(), equalTo(expectedNumDocs));
for (int i = 0; i < expectedNumDocs; i++) {
int value = (int) XContentMapValues.extractValue("_source.field", (Map<?, ?>) hits.get(i));
assertThat(i, equalTo(value));
}
}
private static Map<String, Object> toMap(Response response) throws IOException {
return toMap(EntityUtils.toString(response.getEntity()));
}
private static Map<String, Object> toMap(String response) {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
}
private static void ensureYellow(String index) throws IOException {
Map<String, String> params = new HashMap<>();
params.put("wait_for_status", "yellow");
params.put("wait_for_no_relocating_shards", "true");
params.put("timeout", "30s");
params.put("level", "shards");
assertOK(client().performRequest("GET", "_cluster/health/" + index, params));
}
private RestClient buildLeaderClient() throws IOException {
assert runningAgainstLeaderCluster == false;
String leaderUrl = System.getProperty("tests.leader_host");
int portSeparator = leaderUrl.lastIndexOf(':');
HttpHost httpHost = new HttpHost(leaderUrl.substring(0, portSeparator),
Integer.parseInt(leaderUrl.substring(portSeparator + 1)), getProtocol());
return buildClient(Settings.EMPTY, new HttpHost[]{httpHost});
}
}

View File

@ -12,24 +12,29 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
@ -129,7 +134,7 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, FollowExistingIndexAction.RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
RequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action) {
super(client, action, new Request());
}
}
@ -141,55 +146,91 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final Client client;
private final ClusterService clusterService;
private final RemoteClusterService remoteClusterService;
private final PersistentTasksService persistentTasksService;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Client client,
IndexNameExpressionResolver indexNameExpressionResolver, Client client, ClusterService clusterService,
PersistentTasksService persistentTasksService) {
super(settings, NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, Request::new);
this.client = client;
this.clusterService = clusterService;
this.remoteClusterService = transportService.getRemoteClusterService();
this.persistentTasksService = persistentTasksService;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
client.admin().cluster().state(new ClusterStateRequest(), new ActionListener<ClusterStateResponse>() {
@Override
public void onResponse(ClusterStateResponse clusterStateResponse) {
IndexMetaData leaderIndexMetadata = clusterStateResponse.getState().getMetaData()
.index(request.leaderIndex);
ClusterState localClusterState = clusterService.state();
IndexMetaData followIndexMetadata = localClusterState.getMetaData().index(request.followIndex);
String[] indices = new String[]{request.getLeaderIndex()};
Map<String, List<String>> remoteClusterIndices = remoteClusterService.groupClusterIndices(indices, s -> false);
if (remoteClusterIndices.containsKey(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY)) {
// Following an index in local cluster, so use local cluster state to fetch leader IndexMetaData:
IndexMetaData leaderIndexMetadata = localClusterState.getMetaData().index(request.leaderIndex);
start(request, null, leaderIndexMetadata, followIndexMetadata, listener);
} else {
// Following an index in remote cluster, so use remote client to fetch leader IndexMetaData:
assert remoteClusterIndices.size() == 1;
Map.Entry<String, List<String>> entry = remoteClusterIndices.entrySet().iterator().next();
assert entry.getValue().size() == 1;
String clusterNameAlias = entry.getKey();
String leaderIndex = entry.getValue().get(0);
Client remoteClient = client.getRemoteClusterClient(clusterNameAlias);
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(r -> {
ClusterState remoteClusterState = r.getState();
IndexMetaData leaderIndexMetadata = remoteClusterState.getMetaData().index(leaderIndex);
start(request, clusterNameAlias, leaderIndexMetadata, followIndexMetadata, listener);
}, listener::onFailure));
}
}
/**
* Performs validation on the provided leader and follow {@link IndexMetaData} instances and then
* creates a persistent task for each leader primary shard. This persistent tasks track changes in the leader
* shard and replicate these changes to a follower shard.
*
* Currently the following validation is performed:
* <ul>
* <li>The leader index and follow index need to have the same number of primary shards</li>
* </ul>
*/
void start(Request request, String clusterNameAlias, IndexMetaData leaderIndexMetadata, IndexMetaData followIndexMetadata,
ActionListener<Response> handler) {
if (leaderIndexMetadata == null) {
listener.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"));
handler.onFailure(new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"));
return;
}
IndexMetaData followIndexMetadata = clusterStateResponse.getState().getMetaData()
.index(request.followIndex);
if (followIndexMetadata == null) {
listener.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"));
handler.onFailure(new IllegalArgumentException("follow index [" + request.followIndex + "] does not exist"));
return;
}
if (leaderIndexMetadata.getNumberOfShards() != followIndexMetadata.getNumberOfShards()) {
listener.onFailure(new IllegalArgumentException("leader index primary shards [" +
handler.onFailure(new IllegalArgumentException("leader index primary shards [" +
leaderIndexMetadata.getNumberOfShards() + "] does not match with the number of " +
"shards of the follow index [" + followIndexMetadata.getNumberOfShards() + "]"));
return;
}
// TODO: other validation checks
} else {
final int numShards = followIndexMetadata.getNumberOfShards();
final AtomicInteger counter = new AtomicInteger(numShards);
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
for (int i = 0; i < numShards; i++) {
final int shardId = i;
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
ShardFollowTask shardFollowTask = new ShardFollowTask(new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId), request.batchSize, request.concurrentProcessors,
request.processorMaxTranslogBytes);
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias,
new ShardId(followIndexMetadata.getIndex(), shardId),
new ShardId(leaderIndexMetadata.getIndex(), shardId),
request.batchSize, request.concurrentProcessors, request.processorMaxTranslogBytes);
persistentTasksService.startPersistentTask(taskId, ShardFollowTask.NAME, shardFollowTask,
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
@Override
@ -220,24 +261,17 @@ public class FollowExistingIndexAction extends Action<FollowExistingIndexAction.
if (error == null) {
// include task ids?
listener.onResponse(new Response());
handler.onResponse(new Response());
} else {
// TODO: cancel all started tasks
listener.onFailure(error);
handler.onFailure(error);
}
}
}
});
}
);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}
}

View File

@ -24,6 +24,7 @@ public class ShardFollowTask implements PersistentTaskParams {
public static final String NAME = "shard_follow";
static final ParseField LEADER_CLUSTER_ALIAS_FIELD = new ParseField("leader_cluster_alias");
static final ParseField FOLLOW_SHARD_INDEX_FIELD = new ParseField("follow_shard_index");
static final ParseField FOLLOW_SHARD_INDEX_UUID_FIELD = new ParseField("follow_shard_index_uuid");
static final ParseField FOLLOW_SHARD_SHARDID_FIELD = new ParseField("follow_shard_shard");
@ -35,10 +36,11 @@ public class ShardFollowTask implements PersistentTaskParams {
public static final ParseField PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST = new ParseField("processor_max_translog_bytes");
public static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask(new ShardId((String) a[0], (String) a[1], (int) a[2]),
new ShardId((String) a[3], (String) a[4], (int) a[5]), (long) a[6], (int) a[7], (long) a[8]));
(a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]),
new ShardId((String) a[4], (String) a[5], (int) a[6]), (long) a[7], (int) a[8], (long) a[9]));
static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_INDEX_UUID_FIELD);
PARSER.declareInt(ConstructingObjectParser.constructorArg(), FOLLOW_SHARD_SHARDID_FIELD);
@ -50,14 +52,16 @@ public class ShardFollowTask implements PersistentTaskParams {
PARSER.declareLong(ConstructingObjectParser.constructorArg(), PROCESSOR_MAX_TRANSLOG_BYTES_PER_REQUEST);
}
private final String leaderClusterAlias;
private final ShardId followShardId;
private final ShardId leaderShardId;
private final long maxChunkSize;
private final int numConcurrentChunks;
private final long processorMaxTranslogBytes;
ShardFollowTask(ShardId followShardId, ShardId leaderShardId, long maxChunkSize, int numConcurrentChunks,
long processorMaxTranslogBytes) {
ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, long maxChunkSize,
int numConcurrentChunks, long processorMaxTranslogBytes) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
this.leaderShardId = leaderShardId;
this.maxChunkSize = maxChunkSize;
@ -66,6 +70,7 @@ public class ShardFollowTask implements PersistentTaskParams {
}
public ShardFollowTask(StreamInput in) throws IOException {
this.leaderClusterAlias = in.readOptionalString();
this.followShardId = ShardId.readShardId(in);
this.leaderShardId = ShardId.readShardId(in);
this.maxChunkSize = in.readVLong();
@ -73,6 +78,10 @@ public class ShardFollowTask implements PersistentTaskParams {
this.processorMaxTranslogBytes = in.readVLong();
}
public String getLeaderClusterAlias() {
return leaderClusterAlias;
}
public ShardId getFollowShardId() {
return followShardId;
}
@ -100,6 +109,7 @@ public class ShardFollowTask implements PersistentTaskParams {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(leaderClusterAlias);
followShardId.writeTo(out);
leaderShardId.writeTo(out);
out.writeVLong(maxChunkSize);
@ -114,6 +124,9 @@ public class ShardFollowTask implements PersistentTaskParams {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (leaderClusterAlias != null) {
builder.field(LEADER_CLUSTER_ALIAS_FIELD.getPreferredName(), leaderClusterAlias);
}
builder.field(FOLLOW_SHARD_INDEX_FIELD.getPreferredName(), followShardId.getIndex().getName());
builder.field(FOLLOW_SHARD_INDEX_UUID_FIELD.getPreferredName(), followShardId.getIndex().getUUID());
builder.field(FOLLOW_SHARD_SHARDID_FIELD.getPreferredName(), followShardId.id());
@ -131,7 +144,8 @@ public class ShardFollowTask implements PersistentTaskParams {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardFollowTask that = (ShardFollowTask) o;
return Objects.equals(followShardId, that.followShardId) &&
return Objects.equals(leaderClusterAlias, that.leaderClusterAlias) &&
Objects.equals(followShardId, that.followShardId) &&
Objects.equals(leaderShardId, that.leaderShardId) &&
maxChunkSize == that.maxChunkSize &&
numConcurrentChunks == that.numConcurrentChunks &&
@ -140,7 +154,8 @@ public class ShardFollowTask implements PersistentTaskParams {
@Override
public int hashCode() {
return Objects.hash(followShardId, leaderShardId, maxChunkSize, numConcurrentChunks, processorMaxTranslogBytes);
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxChunkSize, numConcurrentChunks,
processorMaxTranslogBytes);
}
public String toString() {

View File

@ -64,12 +64,16 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
public void validate(ShardFollowTask params, ClusterState clusterState) {
if (params.getLeaderClusterAlias() == null) {
// We can only validate IndexRoutingTable in local cluster,
// for remote cluster we would need to make a remote call and we cannot do this here.
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getLeaderShardId().getIndex());
if (routingTable.shard(params.getLeaderShardId().id()).primaryShard().started() == false) {
throw new IllegalArgumentException("Not all copies of leader shard are started");
}
}
routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
if (routingTable.shard(params.getFollowShardId().id()).primaryShard().started() == false) {
throw new IllegalArgumentException("Not all copies of follow shard are started");
}
@ -85,12 +89,14 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void nodeOperation(AllocatedPersistentTask task, ShardFollowTask params, Task.Status status) {
ShardFollowNodeTask shardFollowNodeTask = (ShardFollowNodeTask) task;
Client leaderClient = params.getLeaderClusterAlias() != null ?
this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client;
logger.info("Starting shard following [{}]", params);
fetchGlobalCheckpoint(params.getFollowShardId(),
followGlobalCheckPoint -> prepare(shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed);
fetchGlobalCheckpoint(client, params.getFollowShardId(),
followGlobalCheckPoint -> prepare(leaderClient, shardFollowNodeTask, params, followGlobalCheckPoint), task::markAsFailed);
}
void prepare(ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
void prepare(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
// TODO: need better cancellation control
return;
@ -98,10 +104,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
final ShardId leaderShard = params.getLeaderShardId();
final ShardId followerShard = params.getFollowShardId();
fetchGlobalCheckpoint(leaderShard, leaderGlobalCheckPoint -> {
fetchGlobalCheckpoint(leaderClient, leaderShard, leaderGlobalCheckPoint -> {
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
retry(task, params, followGlobalCheckPoint);
retry(leaderClient, task, params, followGlobalCheckPoint);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
@ -109,12 +115,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Consumer<Exception> handler = e -> {
if (e == null) {
task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
prepare(task, params, leaderGlobalCheckPoint);
prepare(leaderClient, task, params, leaderGlobalCheckPoint);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, params.getMaxChunkSize(),
ChunksCoordinator coordinator = new ChunksCoordinator(client, leaderClient, ccrExecutor, params.getMaxChunkSize(),
params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
@ -122,7 +128,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}, task::markAsFailed);
}
private void retry(ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
private void retry(Client leaderClient, ShardFollowNodeTask task, ShardFollowTask params, long followGlobalCheckPoint) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
@ -131,12 +137,12 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void doRun() throws Exception {
prepare(task, params, followGlobalCheckPoint);
prepare(leaderClient, task, params, followGlobalCheckPoint);
}
});
}
private void fetchGlobalCheckpoint(ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer<Exception> errorHandler) {
client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> {
IndexStats indexStats = r.getIndex(shardId.getIndexName());
Optional<ShardStats> filteredShardStats = Arrays.stream(indexStats.getShards())
@ -158,7 +164,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class);
private final Client client;
private final Client followerClient;
private final Client leaderClient;
private final Executor ccrExecutor;
private final long batchSize;
@ -172,9 +179,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Exception> failureHolder = new AtomicReference<>();
ChunksCoordinator(Client client, Executor ccrExecutor, long batchSize, int concurrentProcessors,
ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, long batchSize, int concurrentProcessors,
long processorMaxTranslogBytes, ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
this.client = client;
this.followerClient = followerClient;
this.leaderClient = leaderClient;
this.ccrExecutor = ccrExecutor;
this.batchSize = batchSize;
this.concurrentProcessors = concurrentProcessors;
@ -220,7 +228,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
return;
}
LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
ChunkProcessor processor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShard, followerShard, e -> {
Consumer<Exception> processorHandler = e -> {
if (e == null) {
LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]);
processNextChunk();
@ -229,7 +237,9 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
leaderShard, chunk[0], chunk[1]), e);
postProcessChuck(e);
}
});
};
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, leaderShard,
followerShard, processorHandler);
processor.start(chunk[0], chunk[1], processorMaxTranslogBytes);
}
@ -251,7 +261,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
static class ChunkProcessor {
private final Client client;
private final Client leaderClient;
private final Client followerClient;
private final Queue<long[]> chunks;
private final Executor ccrExecutor;
@ -260,9 +271,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final Consumer<Exception> handler;
final AtomicInteger retryCounter = new AtomicInteger(0);
ChunkProcessor(Client client, Queue<long[]> chunks, Executor ccrExecutor, ShardId leaderShard,
ChunkProcessor(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor, ShardId leaderShard,
ShardId followerShard, Consumer<Exception> handler) {
this.client = client;
this.leaderClient = leaderClient;
this.followerClient = followerClient;
this.chunks = chunks;
this.ccrExecutor = ccrExecutor;
this.leaderShard = leaderShard;
@ -275,7 +287,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
request.setMinSeqNo(from);
request.setMaxSeqNo(to);
request.setMaxTranslogsBytes(maxTranslogsBytes);
client.execute(ShardChangesAction.INSTANCE, request, new ActionListener<ShardChangesAction.Response>() {
leaderClient.execute(ShardChangesAction.INSTANCE, request, new ActionListener<ShardChangesAction.Response>() {
@Override
public void onResponse(ShardChangesAction.Response response) {
handleResponse(to, response);
@ -317,7 +329,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void doRun() throws Exception {
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
client.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() {
followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() {
@Override
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
handler.accept(null);

View File

@ -49,7 +49,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
ShardId followShardId = new ShardId("index2", "index1", 0);
ChunksCoordinator coordinator =
new ChunksCoordinator(client, ccrExecutor, 1024, 1, Long.MAX_VALUE, leaderShardId, followShardId, e -> {});
new ChunksCoordinator(client, client, ccrExecutor, 1024, 1, Long.MAX_VALUE, leaderShardId, followShardId, e -> {});
coordinator.createChucks(0, 1024);
List<long[]> result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(1));
@ -105,7 +105,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
Consumer<Exception> handler = e -> assertThat(e, nullValue());
int concurrentProcessors = randomIntBetween(1, 4);
int batchSize = randomIntBetween(1, 1000);
ChunksCoordinator coordinator = new ChunksCoordinator(client, ccrExecutor, batchSize, concurrentProcessors,
ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, batchSize, concurrentProcessors,
Long.MAX_VALUE, leaderShardId, followShardId, handler);
int numberOfOps = randomIntBetween(batchSize, batchSize * 20);
@ -149,7 +149,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
assertThat(e, sameInstance(expectedException));
};
ChunksCoordinator coordinator =
new ChunksCoordinator(client, ccrExecutor, 10, 1, Long.MAX_VALUE, leaderShardId, followShardId, handler);
new ChunksCoordinator(client, client, ccrExecutor, 10, 1, Long.MAX_VALUE, leaderShardId, followShardId, handler);
coordinator.createChucks(0, 20);
assertThat(coordinator.getChunks().size(), equalTo(2));
@ -180,7 +180,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
latch.countDown();
};
ChunksCoordinator coordinator =
new ChunksCoordinator(client, ccrExecutor, 1000, 4, Long.MAX_VALUE, leaderShardId, followShardId, handler);
new ChunksCoordinator(client, client, ccrExecutor, 1000, 4, Long.MAX_VALUE, leaderShardId, followShardId, handler);
coordinator.createChucks(0, 1000000);
assertThat(coordinator.getChunks().size(), equalTo(1000));
@ -206,7 +206,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
@ -226,7 +226,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
@ -247,7 +247,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], notNullValue());
@ -269,7 +269,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], notNullValue());
@ -305,7 +305,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
chunkProcessor.start(from, to, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());

View File

@ -22,6 +22,7 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase<ShardFollo
@Override
protected ShardFollowTask createTestInstance() {
return new ShardFollowTask(
randomAlphaOfLength(4),
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
new ShardId(randomAlphaOfLength(4), randomAlphaOfLength(4), randomInt(5)),
randomIntBetween(1, Integer.MAX_VALUE), randomIntBetween(1, Integer.MAX_VALUE),