diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 61db5954496..7caf144d533 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -148,7 +148,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E @Override public List> getPersistentTasksExecutor(ClusterService clusterService, ThreadPool threadPool, Client client) { - return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool)); + return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool, clusterService)); } public List> getActions() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index 155771b4724..781fb359a45 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -62,6 +62,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private final BiConsumer scheduler; private final LongSupplier relativeTimeProvider; + private String followerHistoryUUID; private long leaderGlobalCheckpoint; private long leaderMaxSeqNo; private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -110,15 +111,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { } void start( - final long leaderGlobalCheckpoint, - final long leaderMaxSeqNo, - final long followerGlobalCheckpoint, - final long followerMaxSeqNo) { + final String followerHistoryUUID, + final long leaderGlobalCheckpoint, + final long leaderMaxSeqNo, + final long followerGlobalCheckpoint, + final long followerMaxSeqNo) { /* * While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to * avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock. */ synchronized (this) { + this.followerHistoryUUID = followerHistoryUUID; this.leaderGlobalCheckpoint = leaderGlobalCheckpoint; this.leaderMaxSeqNo = leaderMaxSeqNo; this.followerGlobalCheckpoint = followerGlobalCheckpoint; @@ -305,7 +308,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { AtomicInteger retryCounter) { assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "mus is not replicated"; final long startTime = relativeTimeProvider.getAsLong(); - innerSendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes, + innerSendBulkShardOperationsRequest(followerHistoryUUID, operations, leaderMaxSeqNoOfUpdatesOrDeletes, response -> { synchronized (ShardFollowNodeTask.this) { totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime); @@ -404,8 +407,11 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { // These methods are protected for testing purposes: protected abstract void innerUpdateMapping(LongConsumer handler, Consumer errorHandler); - protected abstract void innerSendBulkShardOperationsRequest(List operations, long leaderMaxSeqNoOfUpdatesOrDeletes, - Consumer handler, Consumer errorHandler); + protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID, + List operations, + long leaderMaxSeqNoOfUpdatesOrDeletes, + Consumer handler, + Consumer errorHandler); protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer handler, Consumer errorHandler); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index cc9d074786d..85afd8be28a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -51,13 +51,12 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); - public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid"); @SuppressWarnings("unchecked") private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (ByteSizeValue) a[9], - (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (String) a[14], (Map) a[15])); + (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map) a[14])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); @@ -82,7 +81,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), POLL_TIMEOUT, ObjectParser.ValueType.STRING); - PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } @@ -96,7 +94,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final int maxWriteBufferSize; private final TimeValue maxRetryDelay; private final TimeValue pollTimeout; - private final String recordedLeaderIndexHistoryUUID; private final Map headers; ShardFollowTask( @@ -110,7 +107,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { final int maxWriteBufferSize, final TimeValue maxRetryDelay, final TimeValue pollTimeout, - final String recordedLeaderIndexHistoryUUID, final Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; @@ -122,7 +118,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; this.pollTimeout = pollTimeout; - this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -137,7 +132,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxWriteBufferSize = in.readVInt(); this.maxRetryDelay = in.readTimeValue(); this.pollTimeout = in.readTimeValue(); - this.recordedLeaderIndexHistoryUUID = in.readString(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -185,10 +179,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { return followShardId.getIndex().getUUID() + "-" + followShardId.getId(); } - public String getRecordedLeaderIndexHistoryUUID() { - return recordedLeaderIndexHistoryUUID; - } - public Map getHeaders() { return headers; } @@ -210,7 +200,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { out.writeVInt(maxWriteBufferSize); out.writeTimeValue(maxRetryDelay); out.writeTimeValue(pollTimeout); - out.writeString(recordedLeaderIndexHistoryUUID); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -237,7 +226,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); - builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID); builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); } @@ -257,7 +245,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { maxWriteBufferSize == that.maxWriteBufferSize && Objects.equals(maxRetryDelay, that.maxRetryDelay) && Objects.equals(pollTimeout, that.pollTimeout) && - Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) && Objects.equals(headers, that.headers); } @@ -274,8 +261,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { maxWriteBufferSize, maxRetryDelay, pollTimeout, - recordedLeaderIndexHistoryUUID, - headers); + headers + ); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 27958070aae..fddb779fdc2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -17,12 +17,15 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; @@ -47,16 +50,19 @@ import java.util.function.Consumer; import java.util.function.LongConsumer; import static org.elasticsearch.xpack.ccr.CcrLicenseChecker.wrapClient; +import static org.elasticsearch.xpack.ccr.action.TransportResumeFollowAction.extractLeaderShardHistoryUUIDs; public class ShardFollowTasksExecutor extends PersistentTasksExecutor { private final Client client; private final ThreadPool threadPool; + private final ClusterService clusterService; - public ShardFollowTasksExecutor(Settings settings, Client client, ThreadPool threadPool) { + public ShardFollowTasksExecutor(Settings settings, Client client, ThreadPool threadPool, ClusterService clusterService) { super(settings, ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME); this.client = client; this.threadPool = threadPool; + this.clusterService = clusterService; } @Override @@ -99,8 +105,10 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor errorHandler) { @@ -135,12 +143,14 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor operations, - final long maxSeqNoOfUpdatesOrDeletes, - final Consumer handler, - final Consumer errorHandler) { - final BulkShardOperationsRequest request = new BulkShardOperationsRequest( - params.getFollowShardId(), operations, maxSeqNoOfUpdatesOrDeletes); + final String followerHistoryUUID, + final List operations, + final long maxSeqNoOfUpdatesOrDeletes, + final Consumer handler, + final Consumer errorHandler) { + + final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), + followerHistoryUUID, operations, maxSeqNoOfUpdatesOrDeletes); followerClient.execute(BulkShardOperationsAction.INSTANCE, request, ActionListener.wrap(response -> handler.accept(response), errorHandler)); } @@ -149,7 +159,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler, Consumer errorHandler) { ShardChangesAction.Request request = - new ShardChangesAction.Request(params.getLeaderShardId(), params.getRecordedLeaderIndexHistoryUUID()); + new ShardChangesAction.Request(params.getLeaderShardId(), recordedLeaderShardHistoryUUID); request.setFromSeqNo(from); request.setMaxOperationCount(maxOperationCount); request.setMaxBatchSize(params.getMaxBatchSize()); @@ -159,8 +169,15 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor ccrIndexMetadata = followIndexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + String[] recordedLeaderShardHistoryUUIDs = extractLeaderShardHistoryUUIDs(ccrIndexMetadata); + return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()]; + } + + interface FollowerStatsInfoHandler { + void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo); } @Override @@ -169,7 +186,9 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor shardFollowNodeTask.start(followerGCP, maxSeqNo, followerGCP, maxSeqNo); + FollowerStatsInfoHandler handler = (followerHistoryUUID, followerGCP, maxSeqNo) -> { + shardFollowNodeTask.start(followerHistoryUUID, followerGCP, maxSeqNo, followerGCP, maxSeqNo); + }; Consumer errorHandler = e -> { if (shardFollowNodeTask.isStopped()) { return; @@ -184,13 +203,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor errorHandler) { client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); @@ -204,10 +223,14 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor shardStats.getShardRouting().primary()) .findAny(); if (filteredShardStats.isPresent()) { - final SeqNoStats seqNoStats = filteredShardStats.get().getSeqNoStats(); + final ShardStats shardStats = filteredShardStats.get(); + final CommitStats commitStats = shardStats.getCommitStats(); + final String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); + + final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); final long globalCheckpoint = seqNoStats.getGlobalCheckpoint(); final long maxSeqNo = seqNoStats.getMaxSeqNo(); - handler.accept(globalCheckpoint, maxSeqNo); + handler.accept(historyUUID, globalCheckpoint, maxSeqNo); } else { errorHandler.accept(new ShardNotFoundException(shardId)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java index dc22c5d89bc..122aef0b3e8 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java @@ -174,7 +174,7 @@ public final class TransportPutFollowAction listener::onFailure); // Can't use create index api here, because then index templates can alter the mappings / settings. // And index templates could introduce settings / mappings that are incompatible with the leader index. - clusterService.submitStateUpdateTask("follow_index_action", new AckedClusterStateUpdateTask(request, handler) { + clusterService.submitStateUpdateTask("create_following_index", new AckedClusterStateUpdateTask(request, handler) { @Override protected Boolean newResponse(final boolean acknowledged) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java index e106467af4b..24a5891dd37 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java @@ -192,12 +192,9 @@ public class TransportResumeFollowAction extends HandledTransportAction ccrIndexMetadata = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); - String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata); - String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId]; - final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request, - leaderIndexMetadata, followIndexMetadata, recordedLeaderShardHistoryUUID, filteredHeaders); + final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request, + leaderIndexMetadata, followIndexMetadata, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override @@ -263,7 +260,7 @@ public class TransportResumeFollowAction extends HandledTransportAction filteredHeaders ) { int maxBatchOperationCount; @@ -363,13 +359,16 @@ public class TransportResumeFollowAction extends HandledTransportAction ccrIndexMetaData) { + static String[] extractLeaderShardHistoryUUIDs(Map ccrIndexMetaData) { String historyUUIDs = ccrIndexMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS); + if (historyUUIDs == null) { + throw new IllegalArgumentException("leader index shard UUIDs are missing"); + } + return historyUUIDs.split(","); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java index 80efba7831e..cf9239af740 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java @@ -16,19 +16,28 @@ import java.util.List; public final class BulkShardOperationsRequest extends ReplicatedWriteRequest { + private String historyUUID; private List operations; private long maxSeqNoOfUpdatesOrDeletes; public BulkShardOperationsRequest() { } - public BulkShardOperationsRequest(ShardId shardId, List operations, long maxSeqNoOfUpdatesOrDeletes) { + public BulkShardOperationsRequest(final ShardId shardId, + final String historyUUID, + final List operations, + long maxSeqNoOfUpdatesOrDeletes) { super(shardId); setRefreshPolicy(RefreshPolicy.NONE); + this.historyUUID = historyUUID; this.operations = operations; this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes; } + public String getHistoryUUID() { + return historyUUID; + } + public List getOperations() { return operations; } @@ -40,6 +49,7 @@ public final class BulkShardOperationsRequest extends ReplicatedWriteRequest targetOperations = sourceOperations.stream().map(operation -> { final Translog.Operation operationWithPrimaryTerm; switch (operation.opType()) { @@ -110,7 +116,7 @@ public class TransportBulkShardOperationsAction primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY); final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest( - shardId, targetOperations, maxSeqNoOfUpdatesOrDeletes); + shardId, historyUUID, targetOperations, maxSeqNoOfUpdatesOrDeletes); return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index 8d400edb04c..d4b2d630966 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -51,7 +51,7 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { } private void startAndAssertAndStopTask(ShardFollowNodeTask task, TestRun testRun) throws Exception { - task.start(testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1); + task.start("uuid", testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1, testRun.startSeqNo - 1); assertBusy(() -> { ShardFollowNodeTaskStatus status = task.getStatus(); assertThat(status.leaderGlobalCheckpoint(), equalTo(testRun.finalExpectedGlobalCheckpoint)); @@ -85,7 +85,6 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { 10240, TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), - "uuid", Collections.emptyMap() ); @@ -111,10 +110,10 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { @Override protected void innerSendBulkShardOperationsRequest( - List operations, - long maxSeqNoOfUpdates, - Consumer handler, - Consumer errorHandler) { + String followerHistoryUUID, List operations, + long maxSeqNoOfUpdates, + Consumer handler, + Consumer errorHandler) { for(Translog.Operation op : operations) { tracker.markSeqNoAsCompleted(op.seqNo()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index e08335eb596..8727f8b907b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -125,7 +125,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { shardChangesRequests.clear(); // The call the updateMapping is a noop, so noting happens. - task.start(128L, 128L, task.getStatus().followerGlobalCheckpoint(), task.getStatus().followerMaxSeqNo()); + task.start("uuid", 128L, 128L, task.getStatus().followerGlobalCheckpoint(), task.getStatus().followerMaxSeqNo()); task.markAsCompleted(); task.coordinateReads(); assertThat(shardChangesRequests.size(), equalTo(0)); @@ -682,7 +682,6 @@ public class ShardFollowNodeTaskTests extends ESTestCase { bufferWriteLimit, TimeValue.ZERO, TimeValue.ZERO, - "uuid", Collections.emptyMap() ); @@ -715,10 +714,10 @@ public class ShardFollowNodeTaskTests extends ESTestCase { @Override protected void innerSendBulkShardOperationsRequest( - final List operations, - final long maxSeqNoOfUpdates, - final Consumer handler, - final Consumer errorHandler) { + String followerHistoryUUID, final List operations, + final long maxSeqNoOfUpdates, + final Consumer handler, + final Consumer errorHandler) { bulkShardOperationRequests.add(operations); Exception writeFailure = ShardFollowNodeTaskTests.this.writeFailures.poll(); if (writeFailure != null) { @@ -796,7 +795,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { // The call the updateMapping is a noop, so noting happens. - task.start(leaderGlobalCheckpoint, leaderGlobalCheckpoint, followerGlobalCheckpoint, followerGlobalCheckpoint); + task.start("uuid", leaderGlobalCheckpoint, leaderGlobalCheckpoint, followerGlobalCheckpoint, followerGlobalCheckpoint); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 7580c2bed8c..055005b9e7d 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -63,6 +63,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); shardFollowTask.start( + followerGroup.getPrimary().getHistoryUUID(), leaderSeqNoStats.getGlobalCheckpoint(), leaderSeqNoStats.getMaxSeqNo(), followerSeqNoStats.getGlobalCheckpoint(), @@ -103,6 +104,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); shardFollowTask.start( + followerGroup.getPrimary().getHistoryUUID(), leaderSeqNoStats.getGlobalCheckpoint(), leaderSeqNoStats.getMaxSeqNo(), followerSeqNoStats.getGlobalCheckpoint(), @@ -137,7 +139,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } } - public void testChangeHistoryUUID() throws Exception { + public void testChangeLeaderHistoryUUID() throws Exception { try (ReplicationGroup leaderGroup = createGroup(0); ReplicationGroup followerGroup = createFollowGroup(0)) { leaderGroup.startAll(); @@ -148,6 +150,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); shardFollowTask.start( + followerGroup.getPrimary().getHistoryUUID(), leaderSeqNoStats.getGlobalCheckpoint(), leaderSeqNoStats.getMaxSeqNo(), followerSeqNoStats.getGlobalCheckpoint(), @@ -177,6 +180,47 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } } + public void testChangeFollowerHistoryUUID() throws Exception { + try (ReplicationGroup leaderGroup = createGroup(0); + ReplicationGroup followerGroup = createFollowGroup(0)) { + leaderGroup.startAll(); + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( + followerGroup.getPrimary().getHistoryUUID(), + leaderSeqNoStats.getGlobalCheckpoint(), + leaderSeqNoStats.getMaxSeqNo(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + + String oldHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); + followerGroup.reinitPrimaryShard(); + followerGroup.getPrimary().store().bootstrapNewHistory(); + recoverShardFromStore(followerGroup.getPrimary()); + String newHistoryUUID = followerGroup.getPrimary().getHistoryUUID(); + + // force the global checkpoint on the leader to advance + leaderGroup.appendDocs(64); + + assertBusy(() -> { + assertThat(shardFollowTask.isStopped(), is(true)); + assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + "], actual [" + newHistoryUUID + "], shard is likely restored from snapshot or force allocated")); + }); + } + } + @Override protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) @@ -217,9 +261,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest between(1, 4), 10240, TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), - leaderGroup.getPrimary().getHistoryUUID(), Collections.emptyMap() ); + final String recordedLeaderIndexHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); AtomicBoolean stopped = new AtomicBoolean(false); @@ -245,13 +289,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest @Override protected void innerSendBulkShardOperationsRequest( - final List operations, - final long maxSeqNoOfUpdates, - final Consumer handler, - final Consumer errorHandler) { + final String followerHistoryUUID, + final List operations, + final long maxSeqNoOfUpdates, + final Consumer handler, + final Consumer errorHandler) { Runnable task = () -> { - BulkShardOperationsRequest request = new BulkShardOperationsRequest( - params.getFollowShardId(), operations, maxSeqNoOfUpdates); + BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), + followerHistoryUUID, operations, maxSeqNoOfUpdates); ActionListener listener = ActionListener.wrap(handler::accept, errorHandler); new CCRAction(request, listener, followerGroup).execute(); }; @@ -277,7 +322,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest return; } Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, - maxOperationCount, params.getRecordedLeaderIndexHistoryUUID(), params.getMaxBatchSize()); + maxOperationCount, recordedLeaderIndexHistoryUUID, params.getMaxBatchSize()); // hard code mapping version; this is ok, as mapping updates are not tested here final ShardChangesAction.Response response = new ShardChangesAction.Response( 1L, @@ -340,8 +385,8 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest @Override protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception { TransportWriteAction.WritePrimaryResult result = - TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getOperations(), - request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); + TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), + request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index b3a0272a1b6..865d18e6067 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -36,7 +36,6 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase task = diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index 88e6d4113d3..fe85e8a7445 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -59,7 +59,8 @@ public class BulkShardOperationsTests extends IndexShardTestCase { } final TransportWriteAction.WritePrimaryResult result = - TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, + TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), followerPrimary.getHistoryUUID(), + operations, numOps - 1, followerPrimary, logger); try (Translog.Snapshot snapshot = followerPrimary.getHistoryOperations("test", 0)) {