[CCR] Sync mappings between leader and follow index (#30115)

The shard changes api returns the minimum IndexMetadata version the leader
index needs to have. If the leader side is behind on IndexMetadata version
then follow shard task waits with processing write operations until the
mapping has been fetched from leader index and applied in follower index
in the background.

The cluster state api is used to fetch the leader mapping and put mapping api
to apply the mapping in the follower index. This works because put mapping
api accepts fields that are already defined.

Relates to #30086
This commit is contained in:
Martijn van Groningen 2018-05-28 07:37:27 +02:00 committed by GitHub
parent e477147143
commit 51caefe46c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 337 additions and 90 deletions

View File

@ -156,15 +156,21 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
public static final class Response extends ActionResponse {
private long indexMetadataVersion;
private Translog.Operation[] operations;
Response() {
}
Response(final Translog.Operation[] operations) {
Response(long indexMetadataVersion, final Translog.Operation[] operations) {
this.indexMetadataVersion = indexMetadataVersion;
this.operations = operations;
}
public long getIndexMetadataVersion() {
return indexMetadataVersion;
}
public Translog.Operation[] getOperations() {
return operations;
}
@ -172,12 +178,14 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
indexMetadataVersion = in.readVLong();
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(indexMetadataVersion);
out.writeArray(Translog.Operation::writeOperation, operations);
}
@ -186,12 +194,16 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final Response response = (Response) o;
return Arrays.equals(operations, response.operations);
return indexMetadataVersion == response.indexMetadataVersion &&
Arrays.equals(operations, response.operations);
}
@Override
public int hashCode() {
return Arrays.hashCode(operations);
int result = 1;
result += Objects.hashCode(indexMetadataVersion);
result += Arrays.hashCode(operations);
return result;
}
}
@ -224,8 +236,11 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
IndexShard indexShard = indexService.getShard(request.getShard().id());
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint());
return getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
final Translog.Operation[] operations =
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
return new Response(indexMetaDataVersion, operations);
}
@Override
@ -250,7 +265,8 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0];
static Response getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo, long byteLimit) throws IOException {
static Translog.Operation[] getOperationsBetween(IndexShard indexShard, long minSeqNo, long maxSeqNo,
long byteLimit) throws IOException {
if (indexShard.state() != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
}
@ -266,6 +282,6 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
}
}
}
return new Response(operations.toArray(EMPTY_OPERATIONS_ARRAY));
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
}
}

View File

@ -10,6 +10,8 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
@ -20,6 +22,8 @@ import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
@ -27,10 +31,14 @@ import org.elasticsearch.common.transport.NetworkExceptionHelper;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
@ -38,8 +46,6 @@ import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import java.util.Arrays;
import java.util.Map;
@ -47,8 +53,11 @@ import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
@ -101,16 +110,26 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Client leaderClient = wrapClient(params.getLeaderClusterAlias() != null ?
this.client.getRemoteClusterClient(params.getLeaderClusterAlias()) : this.client, params);
Client followerClient = wrapClient(this.client, params);
logger.info("Starting shard following [{}]", params);
fetchGlobalCheckpoint(followerClient, params.getFollowShardId(),
followGlobalCheckPoint -> {
shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint);
prepare(leaderClient, followerClient,shardFollowNodeTask, params, followGlobalCheckPoint);
}, task::markAsFailed);
IndexMetadataVersionChecker imdVersionChecker = new IndexMetadataVersionChecker(params.getLeaderShardId().getIndex(),
params.getFollowShardId().getIndex(), client, leaderClient);
logger.info("[{}] initial leader mapping with follower mapping syncing", params);
imdVersionChecker.updateMapping(1L /* Force update, version is initially 0L */, e -> {
if (e == null) {
logger.info("Starting shard following [{}]", params);
fetchGlobalCheckpoint(followerClient, params.getFollowShardId(),
followGlobalCheckPoint -> {
shardFollowNodeTask.updateProcessedGlobalCheckpoint(followGlobalCheckPoint);
prepare(leaderClient, followerClient,shardFollowNodeTask, params, followGlobalCheckPoint, imdVersionChecker);
}, task::markAsFailed);
} else {
shardFollowNodeTask.markAsFailed(e);
}
});
}
void prepare(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint) {
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
if (task.getState() != AllocatedPersistentTask.State.STARTED) {
// TODO: need better cancellation control
return;
@ -122,7 +141,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
// TODO: check if both indices have the same history uuid
if (leaderGlobalCheckPoint == followGlobalCheckPoint) {
logger.debug("{} no write operations to fetch", followerShard);
retry(leaderClient, followerClient, task, params, followGlobalCheckPoint);
retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
} else {
assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint +
"] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]";
@ -132,13 +151,14 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
Consumer<Exception> handler = e -> {
if (e == null) {
task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint);
prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint);
prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint, imdVersionChecker);
} else {
task.markAsFailed(e);
}
};
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, params.getMaxChunkSize(),
params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, followerShard, handler);
ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker,
params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard,
followerShard, handler);
coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint);
coordinator.start();
}
@ -146,7 +166,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
}
private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params,
long followGlobalCheckPoint) {
long followGlobalCheckPoint,
IndexMetadataVersionChecker imdVersionChecker) {
threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
@ -155,7 +176,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void doRun() throws Exception {
prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint);
prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker);
}
});
}
@ -184,6 +205,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final Client followerClient;
private final Client leaderClient;
private final Executor ccrExecutor;
private final IndexMetadataVersionChecker imdVersionChecker;
private final long batchSize;
private final int concurrentProcessors;
@ -196,11 +218,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final Queue<long[]> chunks = new ConcurrentLinkedQueue<>();
private final AtomicReference<Exception> failureHolder = new AtomicReference<>();
ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, long batchSize, int concurrentProcessors,
long processorMaxTranslogBytes, ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker,
long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard,
ShardId followerShard, Consumer<Exception> handler) {
this.followerClient = followerClient;
this.leaderClient = leaderClient;
this.ccrExecutor = ccrExecutor;
this.imdVersionChecker = imdVersionChecker;
this.batchSize = batchSize;
this.concurrentProcessors = concurrentProcessors;
this.processorMaxTranslogBytes = processorMaxTranslogBytes;
@ -255,8 +279,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
postProcessChuck(e);
}
};
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, leaderShard,
followerShard, processorHandler);
ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker,
leaderShard, followerShard, processorHandler);
processor.start(chunk[0], chunk[1], processorMaxTranslogBytes);
}
@ -282,18 +306,21 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final Client followerClient;
private final Queue<long[]> chunks;
private final Executor ccrExecutor;
private final BiConsumer<Long, Consumer<Exception>> indexVersionChecker;
private final ShardId leaderShard;
private final ShardId followerShard;
private final Consumer<Exception> handler;
final AtomicInteger retryCounter = new AtomicInteger(0);
ChunkProcessor(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor, ShardId leaderShard,
ShardId followerShard, Consumer<Exception> handler) {
ChunkProcessor(Client leaderClient, Client followerClient, Queue<long[]> chunks, Executor ccrExecutor,
BiConsumer<Long, Consumer<Exception>> indexVersionChecker,
ShardId leaderShard, ShardId followerShard, Consumer<Exception> handler) {
this.leaderClient = leaderClient;
this.followerClient = followerClient;
this.chunks = chunks;
this.ccrExecutor = ccrExecutor;
this.indexVersionChecker = indexVersionChecker;
this.leaderShard = leaderShard;
this.followerShard = followerShard;
this.handler = handler;
@ -347,20 +374,33 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
@Override
protected void doRun() throws Exception {
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
followerClient.execute(BulkShardOperationsAction.INSTANCE, request, new ActionListener<BulkShardOperationsResponse>() {
@Override
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
handler.accept(null);
indexVersionChecker.accept(response.getIndexMetadataVersion(), e -> {
if (e != null) {
if (shouldRetry(e) && retryCounter.incrementAndGet() <= PROCESSOR_RETRY_LIMIT) {
handleResponse(to, response);
} else {
handler.accept(new ElasticsearchException("retrying failed [" + retryCounter.get() +
"] times, aborting...", e));
}
return;
}
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(followerShard, response.getOperations());
followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
new ActionListener<BulkShardOperationsResponse>() {
@Override
public void onResponse(final BulkShardOperationsResponse bulkShardOperationsResponse) {
handler.accept(null);
}
@Override
public void onFailure(final Exception e) {
// No retry mechanism here, because if a failure is being redirected to this place it is considered
// non recoverable.
assert e != null;
handler.accept(e);
}
@Override
public void onFailure(final Exception e) {
// No retry mechanism here, because if a failure is being redirected to this place it is considered
// non recoverable.
assert e != null;
handler.accept(e);
}
}
);
});
}
});
@ -404,4 +444,76 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
return storedContext;
}
static final class IndexMetadataVersionChecker implements BiConsumer<Long, Consumer<Exception>> {
private static final Logger LOGGER = Loggers.getLogger(IndexMetadataVersionChecker.class);
private final Client followClient;
private final Client leaderClient;
private final Index leaderIndex;
private final Index followIndex;
private final AtomicLong currentIndexMetadataVersion;
private final Semaphore updateMappingSemaphore = new Semaphore(1);
IndexMetadataVersionChecker(Index leaderIndex, Index followIndex, Client followClient, Client leaderClient) {
this.followClient = followClient;
this.leaderIndex = leaderIndex;
this.followIndex = followIndex;
this.leaderClient = leaderClient;
this.currentIndexMetadataVersion = new AtomicLong();
}
public void accept(Long minimumRequiredIndexMetadataVersion, Consumer<Exception> handler) {
if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) {
LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion);
handler.accept(null);
} else {
updateMapping(minimumRequiredIndexMetadataVersion, handler);
}
}
void updateMapping(long minimumRequiredIndexMetadataVersion, Consumer<Exception> handler) {
try {
updateMappingSemaphore.acquire();
} catch (InterruptedException e) {
handler.accept(e);
return;
}
if (currentIndexMetadataVersion.get() >= minimumRequiredIndexMetadataVersion) {
updateMappingSemaphore.release();
LOGGER.debug("Current index metadata version [{}] is higher or equal than minimum required index metadata version [{}]",
currentIndexMetadataVersion.get(), minimumRequiredIndexMetadataVersion);
handler.accept(null);
return;
}
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
leaderClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
assert indexMetaData.getMappings().size() == 1;
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;
PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName());
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
followClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(putMappingResponse -> {
currentIndexMetadataVersion.set(indexMetaData.getVersion());
updateMappingSemaphore.release();
handler.accept(null);
}, e -> {
updateMappingSemaphore.release();
handler.accept(e);
}));
}, e -> {
updateMappingSemaphore.release();
handler.accept(e);
}));
}
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -27,6 +28,7 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
@ -43,11 +45,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, transportClientRatio = 0)
public class ShardChangesIT extends ESIntegTestCase {
@ -141,6 +145,7 @@ public class ShardChangesIT extends ESIntegTestCase {
assertThat(operation.id(), equalTo("5"));
}
// @TestLogging("org.elasticsearch.xpack.ccr.action:DEBUG")
public void testFollowIndex() throws Exception {
final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards,
@ -202,6 +207,67 @@ public class ShardChangesIT extends ESIntegTestCase {
unfollowIndex("index2");
}
public void testSyncMappings() throws Exception {
final String leaderIndexSettings = getIndexSettings(2,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index1");
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request();
followRequest.setLeaderIndex("index1");
followRequest.setFollowIndex("index2");
final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request();
createAndFollowRequest.setFollowRequest(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
final long firstBatchNumDocs = randomIntBetween(2, 64);
for (long i = 0; i < firstBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"f\":%d}", i);
client().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get();
}
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits, equalTo(firstBatchNumDocs)));
MappingMetaData mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer"));
assertThat(XContentMapValues.extractValue("properties.k", mappingMetaData.sourceAsMap()), nullValue());
final int secondBatchNumDocs = randomIntBetween(2, 64);
for (long i = firstBatchNumDocs; i < firstBatchNumDocs + secondBatchNumDocs; i++) {
final String source = String.format(Locale.ROOT, "{\"k\":%d}", i);
client().prepareIndex("index1", "doc", Long.toString(i)).setSource(source, XContentType.JSON).get();
}
assertBusy(() -> assertThat(client().prepareSearch("index2").get().getHits().totalHits,
equalTo(firstBatchNumDocs + secondBatchNumDocs)));
mappingMetaData = client().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.f.type", mappingMetaData.sourceAsMap()), equalTo("integer"));
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
unfollowRequest.setFollowIndex("index2");
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(0));
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
ListTasksResponse listTasksResponse = client().admin().cluster().listTasks(listTasksRequest).get();
int numNodeTasks = 0;
for (TaskInfo taskInfo : listTasksResponse.getTasks()) {
if (taskInfo.getAction().startsWith(ListTasksAction.NAME) == false) {
numNodeTasks++;
}
}
assertThat(numNodeTasks, equalTo(0));
});
}
public void testFollowIndexWithNestedField() throws Exception {
final String leaderIndexSettings =
getIndexSettingsWithNestedMapping(1, Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
@ -284,8 +350,7 @@ public class ShardChangesIT extends ESIntegTestCase {
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
assertThat(tasks.tasks().size(), equalTo(numberOfPrimaryShards));
final PersistentTasksCustomMetaData taskMetadata = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setDetailed(true);
@ -296,11 +361,12 @@ public class ShardChangesIT extends ESIntegTestCase {
List<TaskInfo> taskInfos = listTasksResponse.getTasks();
assertThat(taskInfos.size(), equalTo(numberOfPrimaryShards));
for (PersistentTasksCustomMetaData.PersistentTask<?> task : tasks.tasks()) {
final ShardFollowTask shardFollowTask = (ShardFollowTask) task.getParams();
Collection<PersistentTasksCustomMetaData.PersistentTask<?>> shardFollowTasks =
taskMetadata.findTasks(ShardFollowTask.NAME, Objects::nonNull);
for (PersistentTasksCustomMetaData.PersistentTask<?> shardFollowTask : shardFollowTasks) {
final ShardFollowTask shardFollowTaskParams = (ShardFollowTask) shardFollowTask.getParams();
TaskInfo taskInfo = null;
String expectedId = "id=" + task.getId();
String expectedId = "id=" + shardFollowTask.getId();
for (TaskInfo info : taskInfos) {
if (expectedId.equals(info.getDescription())) {
taskInfo = info;
@ -312,7 +378,7 @@ public class ShardChangesIT extends ESIntegTestCase {
assertThat(status, notNullValue());
assertThat(
status.getProcessedGlobalCheckpoint(),
equalTo(numDocsPerShard.get(shardFollowTask.getLeaderShardId())));
equalTo(numDocsPerShard.get(shardFollowTaskParams.getLeaderShardId())));
}
};
}
@ -342,7 +408,7 @@ public class ShardChangesIT extends ESIntegTestCase {
private CheckedRunnable<Exception> assertExpectedDocumentRunnable(final int value) {
return () -> {
final GetResponse getResponse = client().prepareGet("index2", "doc", Integer.toString(value)).get();
assertTrue(getResponse.isExists());
assertTrue("doc with id [" + value + "] does not exist", getResponse.isExists());
assertTrue((getResponse.getSource().containsKey("f")));
assertThat(getResponse.getSource().get("f"), equalTo(value));
};

View File

@ -6,12 +6,15 @@
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator;
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.IndexMetadataVersionChecker;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
@ -25,8 +28,10 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.sameInstance;
@ -39,6 +44,7 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ChunksCoordinatorTests extends ESTestCase {
@ -48,8 +54,10 @@ public class ChunksCoordinatorTests extends ESTestCase {
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
ChunksCoordinator coordinator =
new ChunksCoordinator(client, client, ccrExecutor, 1024, 1, Long.MAX_VALUE, leaderShardId, followShardId, e -> {});
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1024, 1,
Long.MAX_VALUE, leaderShardId, followShardId, e -> {});
coordinator.createChucks(0, 1024);
List<long[]> result = new ArrayList<>(coordinator.getChunks());
assertThat(result.size(), equalTo(1));
@ -95,7 +103,8 @@ public class ChunksCoordinatorTests extends ESTestCase {
}
public void testCoordinator() throws Exception {
Client client = mock(Client.class);
Client client = createClientMock();
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = Runnable::run;
@ -105,8 +114,10 @@ public class ChunksCoordinatorTests extends ESTestCase {
Consumer<Exception> handler = e -> assertThat(e, nullValue());
int concurrentProcessors = randomIntBetween(1, 4);
int batchSize = randomIntBetween(1, 1000);
ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, batchSize, concurrentProcessors,
Long.MAX_VALUE, leaderShardId, followShardId, handler);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, batchSize,
concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler);
int numberOfOps = randomIntBetween(batchSize, batchSize * 20);
long from = randomInt(1000);
@ -128,7 +139,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
public void testCoordinator_failure() throws Exception {
Exception expectedException = new RuntimeException("throw me");
Client client = mock(Client.class);
Client client = createClientMock();
boolean shardChangesActionApiCallFailed;
if (randomBoolean()) {
shardChangesActionApiCallFailed = true;
@ -148,8 +159,10 @@ public class ChunksCoordinatorTests extends ESTestCase {
assertThat(e, notNullValue());
assertThat(e, sameInstance(expectedException));
};
ChunksCoordinator coordinator =
new ChunksCoordinator(client, client, ccrExecutor, 10, 1, Long.MAX_VALUE, leaderShardId, followShardId, handler);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 10, 1, Long.MAX_VALUE,
leaderShardId, followShardId, handler);
coordinator.createChucks(0, 20);
assertThat(coordinator.getChunks().size(), equalTo(2));
@ -162,7 +175,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
}
public void testCoordinator_concurrent() throws Exception {
Client client = mock(Client.class);
Client client = createClientMock();
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = command -> new Thread(command).start();
@ -179,8 +192,10 @@ public class ChunksCoordinatorTests extends ESTestCase {
}
latch.countDown();
};
ChunksCoordinator coordinator =
new ChunksCoordinator(client, client, ccrExecutor, 1000, 4, Long.MAX_VALUE, leaderShardId, followShardId, handler);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1000, 4, Long.MAX_VALUE,
leaderShardId, followShardId, handler);
coordinator.createChucks(0, 1000000);
assertThat(coordinator.getChunks().size(), equalTo(1000));
@ -193,38 +208,44 @@ public class ChunksCoordinatorTests extends ESTestCase {
}
public void testChunkProcessor() {
Client client = mock(Client.class);
Client client = createClientMock();
Queue<long[]> chunks = new LinkedList<>();
mockShardChangesApiCall(client);
mockBulkShardOperationsApiCall(client);
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId,
followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
}
public void testChunkProcessorRetry() {
Client client = mock(Client.class);
Client client = createClientMock();
Queue<long[]> chunks = new LinkedList<>();
mockBulkShardOperationsApiCall(client);
int testRetryLimit = randomIntBetween(1, ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT - 1);
int testRetryLimit = randomIntBetween(1, PROCESSOR_RETRY_LIMIT - 1);
mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception"));
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId,
followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
@ -232,20 +253,23 @@ public class ChunksCoordinatorTests extends ESTestCase {
}
public void testChunkProcessorRetryTooManyTimes() {
Client client = mock(Client.class);
Client client = createClientMock();
Queue<long[]> chunks = new LinkedList<>();
mockBulkShardOperationsApiCall(client);
int testRetryLimit = ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT + 1;
int testRetryLimit = PROCESSOR_RETRY_LIMIT + 1;
mockShardCangesApiCallWithRetry(client, testRetryLimit, new ConnectException("connection exception"));
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId,
followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], notNullValue());
@ -255,7 +279,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
}
public void testChunkProcessorNoneRetryableError() {
Client client = mock(Client.class);
Client client = createClientMock();
Queue<long[]> chunks = new LinkedList<>();
mockBulkShardOperationsApiCall(client);
mockShardCangesApiCallWithRetry(client, 3, new RuntimeException("unexpected"));
@ -263,11 +287,14 @@ public class ChunksCoordinatorTests extends ESTestCase {
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId,
followShardId, handler);
chunkProcessor.start(0, 10, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], notNullValue());
@ -279,7 +306,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
long from = 0;
long to = 20;
long actualTo = 10;
Client client = mock(Client.class);
Client client = createClientMock();
Queue<long[]> chunks = new LinkedList<>();
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
@ -291,7 +318,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
for (int i = 0; i <= actualTo; i++) {
operations.add(new Translog.NoOp(i, 1, "test"));
}
listener.onResponse(new ShardChangesAction.Response(operations.toArray(new Translog.Operation[0])));
listener.onResponse(new ShardChangesAction.Response(1L, operations.toArray(new Translog.Operation[0])));
return null;
}).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any());
@ -299,11 +326,15 @@ public class ChunksCoordinatorTests extends ESTestCase {
Executor ccrExecutor = Runnable::run;
ShardId leaderShardId = new ShardId("index1", "index1", 0);
ShardId followShardId = new ShardId("index2", "index1", 0);
IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(),
followShardId.getIndex(), client, client);
boolean[] invoked = new boolean[1];
Exception[] exception = new Exception[1];
Consumer<Exception> handler = e -> {invoked[0] = true;exception[0] = e;};
ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, leaderShardId, followShardId, handler);
BiConsumer<Long, Consumer<Exception>> versionChecker = (indexVersiuon, consumer) -> consumer.accept(null);
ChunkProcessor chunkProcessor =
new ChunkProcessor(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, followShardId, handler);
chunkProcessor.start(from, to, Long.MAX_VALUE);
assertThat(invoked[0], is(true));
assertThat(exception[0], nullValue());
@ -312,6 +343,15 @@ public class ChunksCoordinatorTests extends ESTestCase {
assertThat(chunks.peek()[1], equalTo(20L));
}
private Client createClientMock() {
Client client = mock(Client.class);
ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
AdminClient adminClient = mock(AdminClient.class);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
when(client.admin()).thenReturn(adminClient);
return client;
}
private void mockShardCangesApiCallWithRetry(Client client, int testRetryLimit, Exception e) {
int[] retryCounter = new int[1];
doAnswer(invocation -> {
@ -328,7 +368,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
for (int i = 0; i < operations.length; i++) {
operations[i] = new Translog.NoOp(request.getMinSeqNo() + i, 1, "test");
}
ShardChangesAction.Response response = new ShardChangesAction.Response(operations);
ShardChangesAction.Response response = new ShardChangesAction.Response(0L, operations);
listener.onResponse(response);
}
return null;
@ -347,7 +387,7 @@ public class ChunksCoordinatorTests extends ESTestCase {
for (long i = request.getMinSeqNo(); i <= request.getMaxSeqNo(); i++) {
operations.add(new Translog.NoOp(request.getMinSeqNo() + i, 1, "test"));
}
ShardChangesAction.Response response = new ShardChangesAction.Response(operations.toArray(new Translog.Operation[0]));
ShardChangesAction.Response response = new ShardChangesAction.Response(0L, operations.toArray(new Translog.Operation[0]));
listener.onResponse(response);
return null;
}).when(client).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any());

View File

@ -5,9 +5,12 @@
*/
package org.elasticsearch.xpack.ccr.action;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
@ -33,6 +36,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
.put("index.number_of_replicas", 0)
.build();
final IndexService indexService = createIndex("index", settings);
IndexMetaData indexMetaData = indexService.getMetaData();
final int numWrites = randomIntBetween(2, 8192);
for (int i = 0; i < numWrites; i++) {
@ -45,8 +49,8 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
for (int iter = 0; iter < iters; iter++) {
int min = randomIntBetween(0, numWrites - 1);
int max = randomIntBetween(min, numWrites - 1);
final ShardChangesAction.Response r = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE);
final List<Long> seenSeqNos = Arrays.stream(r.getOperations()).map(Translog.Operation::seqNo).collect(Collectors.toList());
final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, min, max, Long.MAX_VALUE);
final List<Long> seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList());
final List<Long> expectedSeqNos = LongStream.range(min, max + 1).boxed().collect(Collectors.toList());
assertThat(seenSeqNos, equalTo(expectedSeqNos));
}
@ -65,6 +69,14 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
}
public void testGetOperationsBetweenWhenShardNotStarted() throws Exception {
IndexMetaData indexMetaData = IndexMetaData.builder("index")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
.build())
.build();
IndexShard indexShard = Mockito.mock(IndexShard.class);
ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING);
@ -85,20 +97,20 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase {
}
final IndexShard indexShard = indexService.getShard(0);
final ShardChangesAction.Response r = ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256);
assertThat(r.getOperations().length, equalTo(12));
assertThat(r.getOperations()[0].seqNo(), equalTo(0L));
assertThat(r.getOperations()[1].seqNo(), equalTo(1L));
assertThat(r.getOperations()[2].seqNo(), equalTo(2L));
assertThat(r.getOperations()[3].seqNo(), equalTo(3L));
assertThat(r.getOperations()[4].seqNo(), equalTo(4L));
assertThat(r.getOperations()[5].seqNo(), equalTo(5L));
assertThat(r.getOperations()[6].seqNo(), equalTo(6L));
assertThat(r.getOperations()[7].seqNo(), equalTo(7L));
assertThat(r.getOperations()[8].seqNo(), equalTo(8L));
assertThat(r.getOperations()[9].seqNo(), equalTo(9L));
assertThat(r.getOperations()[10].seqNo(), equalTo(10L));
assertThat(r.getOperations()[11].seqNo(), equalTo(11L));
final Translog.Operation[] operations = ShardChangesAction.getOperationsBetween(indexShard, 0, numWrites - 1, 256);
assertThat(operations.length, equalTo(12));
assertThat(operations[0].seqNo(), equalTo(0L));
assertThat(operations[1].seqNo(), equalTo(1L));
assertThat(operations[2].seqNo(), equalTo(2L));
assertThat(operations[3].seqNo(), equalTo(3L));
assertThat(operations[4].seqNo(), equalTo(4L));
assertThat(operations[5].seqNo(), equalTo(5L));
assertThat(operations[6].seqNo(), equalTo(6L));
assertThat(operations[7].seqNo(), equalTo(7L));
assertThat(operations[8].seqNo(), equalTo(8L));
assertThat(operations[9].seqNo(), equalTo(9L));
assertThat(operations[10].seqNo(), equalTo(10L));
assertThat(operations[11].seqNo(), equalTo(11L));
}
}

View File

@ -12,12 +12,13 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase<ShardC
@Override
protected ShardChangesAction.Response createTestInstance() {
final long indexMetadataVersion = randomNonNegativeLong();
final int numOps = randomInt(8);
final Translog.Operation[] operations = new Translog.Operation[numOps];
for (int i = 0; i < numOps; i++) {
operations[i] = new Translog.NoOp(i, 0, "test");
}
return new ShardChangesAction.Response(operations);
return new ShardChangesAction.Response(indexMetadataVersion, operations);
}
@Override