From 5fa81310cc1ce4506f93c984791a586a01fde4a9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 12 Sep 2018 19:42:00 +0200 Subject: [PATCH] [CCR] Added history uuid validation (#33546) For correctness we need to verify whether the history uuid of the leader index shards never changes while that index is being followed. * The history UUIDs are recorded as custom index metadata in the follow index. * The follow api validates whether the current history UUIDs of the leader index shards are the same as the recorded history UUIDs. If not the follow api fails. * While a follow index is following a leader index; shard follow tasks on each shard changes api call verify whether their current history uuid is the same as the recorded history uuid. Relates to #30086 Co-authored-by: Nhat Nguyen --- .../ESIndexLevelReplicationTestCase.java | 4 + .../xpack/ccr/FollowIndexSecurityIT.java | 2 +- .../java/org/elasticsearch/xpack/ccr/Ccr.java | 2 + .../xpack/ccr/CcrLicenseChecker.java | 93 +++++++++++++++--- .../xpack/ccr/action/ShardChangesAction.java | 33 ++++++- .../xpack/ccr/action/ShardFollowTask.java | 46 +++++++-- .../ccr/action/ShardFollowTasksExecutor.java | 3 +- .../TransportCreateAndFollowIndexAction.java | 21 +++- .../action/TransportFollowIndexAction.java | 51 ++++++++-- .../xpack/ccr/ShardChangesIT.java | 22 ++--- .../ccr/action/ShardChangesActionTests.java | 23 +++-- .../ccr/action/ShardChangesRequestTests.java | 5 +- .../ccr/action/ShardChangesResponseTests.java | 7 +- .../ShardFollowNodeTaskRandomTests.java | 46 +++++++-- .../ccr/action/ShardFollowNodeTaskTests.java | 35 +++++-- .../ShardFollowTaskReplicationTests.java | 71 ++++++++++++-- .../ccr/action/ShardFollowTaskTests.java | 4 +- .../TransportFollowIndexActionTests.java | 96 ++++++++++++------- 18 files changed, 442 insertions(+), 122 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 8717d7ba146..5f0909db0d3 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -443,6 +443,10 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase return primary; } + public synchronized void reinitPrimaryShard() throws IOException { + primary = reinitShard(primary); + } + public void syncGlobalCheckpoint() { PlainActionFuture listener = new PlainActionFuture<>(); try { diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 35774443ded..d1a90a6ccec 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -113,7 +113,7 @@ public class FollowIndexSecurityIT extends ESRestTestCase { e = expectThrows(ResponseException.class, () -> followIndex("leader_cluster:" + unallowedIndex, unallowedIndex)); - assertThat(e.getMessage(), containsString("follow index [" + unallowedIndex + "] does not exist")); + assertThat(e.getMessage(), containsString("action [indices:monitor/stats] is unauthorized for user [test_ccr]")); assertThat(indexExists(adminClient(), unallowedIndex), is(false)); assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0))); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 72782f6e0fe..6220ec07e4b 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -85,6 +85,8 @@ import static org.elasticsearch.xpack.core.XPackSettings.CCR_ENABLED_SETTING; public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin { public static final String CCR_THREAD_POOL_NAME = "ccr"; + public static final String CCR_CUSTOM_METADATA_KEY = "ccr"; + public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids"; private final boolean enabled; private final Settings settings; diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java index f9a5d8fe830..2161d0a1423 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java @@ -10,9 +10,18 @@ import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.action.admin.indices.stats.IndexShardStats; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.index.engine.CommitStats; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.license.RemoteClusterLicenseChecker; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.rest.RestStatus; @@ -21,6 +30,7 @@ import org.elasticsearch.xpack.core.XPackPlugin; import java.util.Collections; import java.util.Locale; import java.util.Objects; +import java.util.function.BiConsumer; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -58,23 +68,24 @@ public final class CcrLicenseChecker { } /** - * Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for - * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. - * Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster. + * Fetches the leader index metadata and history UUIDs for leader index shards from the remote cluster. + * Before fetching the index metadata, the remote cluster is checked for license compatibility with CCR. + * If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. Otherwise, + * the specified consumer is invoked with the leader index metadata fetched from the remote cluster. * - * @param client the client - * @param clusterAlias the remote cluster alias - * @param leaderIndex the name of the leader index - * @param onFailure the failure consumer - * @param leaderIndexMetadataConsumer the leader index metadata consumer - * @param the type of response the listener is waiting for + * @param client the client + * @param clusterAlias the remote cluster alias + * @param leaderIndex the name of the leader index + * @param onFailure the failure consumer + * @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards + * @param the type of response the listener is waiting for */ - public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( final Client client, final String clusterAlias, final String leaderIndex, final Consumer onFailure, - final Consumer leaderIndexMetadataConsumer) { + final BiConsumer consumer) { final ClusterStateRequest request = new ClusterStateRequest(); request.clear(); @@ -85,7 +96,13 @@ public final class CcrLicenseChecker { clusterAlias, request, onFailure, - leaderClusterState -> leaderIndexMetadataConsumer.accept(leaderClusterState.getMetaData().index(leaderIndex)), + leaderClusterState -> { + IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); + final Client leaderClient = client.getRemoteClusterClient(clusterAlias); + fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> { + consumer.accept(historyUUIDs, leaderIndexMetaData); + }); + }, licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck), e -> indexMetadataUnknownRemoteLicense(leaderIndex, clusterAlias, e)); } @@ -168,6 +185,58 @@ public final class CcrLicenseChecker { }); } + /** + * Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient. + * + * @param leaderClient the leader client + * @param leaderIndexMetaData the leader index metadata + * @param onFailure the failure consumer + * @param historyUUIDConsumer the leader index history uuid and consumer + */ + // NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs + // in case of following a local or a remote cluster. + public void fetchLeaderHistoryUUIDs( + final Client leaderClient, + final IndexMetaData leaderIndexMetaData, + final Consumer onFailure, + final Consumer historyUUIDConsumer) { + + String leaderIndex = leaderIndexMetaData.getIndex().getName(); + CheckedConsumer indicesStatsHandler = indicesStatsResponse -> { + IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); + String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()]; + for (IndexShardStats indexShardStats : indexStats) { + for (ShardStats shardStats : indexShardStats) { + // Ignore replica shards as they may not have yet started and + // we just end up overwriting slots in historyUUIDs + if (shardStats.getShardRouting().primary() == false) { + continue; + } + + CommitStats commitStats = shardStats.getCommitStats(); + if (commitStats == null) { + onFailure.accept(new IllegalArgumentException("leader index's commit stats are missing")); + return; + } + String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + historyUUIDs[shardId.id()] = historyUUID; + } + } + for (int i = 0; i < historyUUIDs.length; i++) { + if (historyUUIDs[i] == null) { + onFailure.accept(new IllegalArgumentException("no history uuid for [" + leaderIndex + "][" + i + "]")); + return; + } + } + historyUUIDConsumer.accept(historyUUIDs); + }; + IndicesStatsRequest request = new IndicesStatsRequest(); + request.clear(); + request.indices(leaderIndex); + leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure)); + } + private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense( final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index b6f82783a56..eef3671d516 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -58,11 +58,13 @@ public class ShardChangesAction extends Action { private long fromSeqNo; private int maxOperationCount; private ShardId shardId; + private String expectedHistoryUUID; private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; - public Request(ShardId shardId) { + public Request(ShardId shardId, String expectedHistoryUUID) { super(shardId.getIndexName()); this.shardId = shardId; + this.expectedHistoryUUID = expectedHistoryUUID; } Request() { @@ -96,6 +98,10 @@ public class ShardChangesAction extends Action { this.maxOperationSizeInBytes = maxOperationSizeInBytes; } + public String getExpectedHistoryUUID() { + return expectedHistoryUUID; + } + @Override public ActionRequestValidationException validate() { ActionRequestValidationException validationException = null; @@ -119,6 +125,7 @@ public class ShardChangesAction extends Action { fromSeqNo = in.readVLong(); maxOperationCount = in.readVInt(); shardId = ShardId.readShardId(in); + expectedHistoryUUID = in.readString(); maxOperationSizeInBytes = in.readVLong(); } @@ -128,6 +135,7 @@ public class ShardChangesAction extends Action { out.writeVLong(fromSeqNo); out.writeVInt(maxOperationCount); shardId.writeTo(out); + out.writeString(expectedHistoryUUID); out.writeVLong(maxOperationSizeInBytes); } @@ -140,12 +148,13 @@ public class ShardChangesAction extends Action { return fromSeqNo == request.fromSeqNo && maxOperationCount == request.maxOperationCount && Objects.equals(shardId, request.shardId) && + Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) && maxOperationSizeInBytes == request.maxOperationSizeInBytes; } @Override public int hashCode() { - return Objects.hash(fromSeqNo, maxOperationCount, shardId, maxOperationSizeInBytes); + return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes); } @Override @@ -154,6 +163,7 @@ public class ShardChangesAction extends Action { "fromSeqNo=" + fromSeqNo + ", maxOperationCount=" + maxOperationCount + ", shardId=" + shardId + + ", expectedHistoryUUID=" + expectedHistoryUUID + ", maxOperationSizeInBytes=" + maxOperationSizeInBytes + '}'; } @@ -189,7 +199,12 @@ public class ShardChangesAction extends Action { Response() { } - Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) { + Response( + final long mappingVersion, + final long globalCheckpoint, + final long maxSeqNo, + final Translog.Operation[] operations) { + this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; @@ -260,6 +275,7 @@ public class ShardChangesAction extends Action { seqNoStats.getGlobalCheckpoint(), request.fromSeqNo, request.maxOperationCount, + request.expectedHistoryUUID, request.maxOperationSizeInBytes); return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); } @@ -293,11 +309,20 @@ public class ShardChangesAction extends Action { * Also if the sum of collected operations' size is above the specified maxOperationSizeInBytes then this method * stops collecting more operations and returns what has been collected so far. */ - static Translog.Operation[] getOperations(IndexShard indexShard, long globalCheckpoint, long fromSeqNo, int maxOperationCount, + static Translog.Operation[] getOperations(IndexShard indexShard, + long globalCheckpoint, + long fromSeqNo, + int maxOperationCount, + String expectedHistoryUUID, long maxOperationSizeInBytes) throws IOException { if (indexShard.state() != IndexShardState.STARTED) { throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state()); } + final String historyUUID = indexShard.getHistoryUUID(); + if (historyUUID.equals(expectedHistoryUUID) == false) { + throw new IllegalStateException("unexpected history uuid, expected [" + expectedHistoryUUID + "], actual [" + + historyUUID + "]"); + } if (fromSeqNo > globalCheckpoint) { return EMPTY_OPERATIONS_ARRAY; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 9da19cb1998..62894b0ed99 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -50,12 +50,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay"); public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay"); + public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid"); @SuppressWarnings("unchecked") private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, (a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]), new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (long) a[9], - (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map) a[14])); + (int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (String) a[14], (Map) a[15])); static { PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD); @@ -76,6 +77,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { PARSER.declareField(ConstructingObjectParser.constructorArg(), (p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()), IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING); + PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID); PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS); } @@ -89,11 +91,22 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { private final int maxWriteBufferSize; private final TimeValue maxRetryDelay; private final TimeValue idleShardRetryDelay; + private final String recordedLeaderIndexHistoryUUID; private final Map headers; - ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount, - int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches, - int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map headers) { + ShardFollowTask( + String leaderClusterAlias, + ShardId followShardId, + ShardId leaderShardId, + int maxBatchOperationCount, + int maxConcurrentReadBatches, + long maxBatchSizeInBytes, + int maxConcurrentWriteBatches, + int maxWriteBufferSize, + TimeValue maxRetryDelay, + TimeValue idleShardRetryDelay, + String recordedLeaderIndexHistoryUUID, + Map headers) { this.leaderClusterAlias = leaderClusterAlias; this.followShardId = followShardId; this.leaderShardId = leaderShardId; @@ -104,6 +117,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxWriteBufferSize = maxWriteBufferSize; this.maxRetryDelay = maxRetryDelay; this.idleShardRetryDelay = idleShardRetryDelay; + this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID; this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap(); } @@ -118,6 +132,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { this.maxWriteBufferSize = in.readVInt(); this.maxRetryDelay = in.readTimeValue(); this.idleShardRetryDelay = in.readTimeValue(); + this.recordedLeaderIndexHistoryUUID = in.readString(); this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); } @@ -165,6 +180,10 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { return followShardId.getIndex().getUUID() + "-" + followShardId.getId(); } + public String getRecordedLeaderIndexHistoryUUID() { + return recordedLeaderIndexHistoryUUID; + } + public Map getHeaders() { return headers; } @@ -186,6 +205,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { out.writeVInt(maxWriteBufferSize); out.writeTimeValue(maxRetryDelay); out.writeTimeValue(idleShardRetryDelay); + out.writeString(recordedLeaderIndexHistoryUUID); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } @@ -212,6 +232,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep()); builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep()); + builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID); builder.field(HEADERS.getPreferredName(), headers); return builder.endObject(); } @@ -231,13 +252,26 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams { maxWriteBufferSize == that.maxWriteBufferSize && Objects.equals(maxRetryDelay, that.maxRetryDelay) && Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) && + Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) && Objects.equals(headers, that.headers); } @Override public int hashCode() { - return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches, - maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers); + return Objects.hash( + leaderClusterAlias, + followShardId, + leaderShardId, + maxBatchOperationCount, + maxConcurrentReadBatches, + maxConcurrentWriteBatches, + maxBatchSizeInBytes, + maxWriteBufferSize, + maxRetryDelay, + idleShardRetryDelay, + recordedLeaderIndexHistoryUUID, + headers + ); } public String toString() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 83e3e4806e1..7b63e73ee59 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -133,7 +133,8 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor handler, Consumer errorHandler) { - ShardChangesAction.Request request = new ShardChangesAction.Request(params.getLeaderShardId()); + ShardChangesAction.Request request = + new ShardChangesAction.Request(params.getLeaderShardId(), params.getRecordedLeaderIndexHistoryUUID()); request.setFromSeqNo(from); request.setMaxOperationCount(maxOperationCount); request.setMaxOperationSizeInBytes(params.getMaxBatchSizeInBytes()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCreateAndFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCreateAndFollowIndexAction.java index b99b569a525..c6d1a7c36c5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCreateAndFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCreateAndFollowIndexAction.java @@ -33,14 +33,17 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction; import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Consumer; public final class TransportCreateAndFollowIndexAction extends TransportMasterNodeAction { @@ -116,8 +119,12 @@ public final class TransportCreateAndFollowIndexAction final ClusterState state, final ActionListener listener) { // following an index in local cluster, so use local cluster state to fetch leader index metadata - final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getFollowRequest().getLeaderIndex()); - createFollowerIndex(leaderIndexMetadata, request, listener); + final String leaderIndex = request.getFollowRequest().getLeaderIndex(); + final IndexMetaData leaderIndexMetadata = state.getMetaData().index(leaderIndex); + Consumer handler = historyUUIDs -> { + createFollowerIndex(leaderIndexMetadata, historyUUIDs, request, listener); + }; + ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, handler); } private void createFollowerIndexAndFollowRemoteIndex( @@ -125,16 +132,17 @@ public final class TransportCreateAndFollowIndexAction final String clusterAlias, final String leaderIndex, final ActionListener listener) { - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client, clusterAlias, leaderIndex, listener::onFailure, - leaderIndexMetaData -> createFollowerIndex(leaderIndexMetaData, request, listener)); + (historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener)); } private void createFollowerIndex( final IndexMetaData leaderIndexMetaData, + final String[] historyUUIDs, final CreateAndFollowIndexAction.Request request, final ActionListener listener) { if (leaderIndexMetaData == null) { @@ -172,6 +180,11 @@ public final class TransportCreateAndFollowIndexAction MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex); + // Adding the leader index uuid for each shard as custom metadata: + Map metadata = new HashMap<>(); + metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs)); + imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata); + // Copy all settings, but overwrite a few settings. Settings.Builder settingsBuilder = Settings.builder(); settingsBuilder.put(leaderIndexMetaData.getSettings()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java index 3128a63f24b..fff3f1618aa 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingSlowLog; import org.elasticsearch.index.SearchSlowLog; @@ -35,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; import org.elasticsearch.transport.RemoteClusterService; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; @@ -110,11 +112,16 @@ public class TransportFollowIndexAction extends HandledTransportAction { + try { + start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener); + } catch (final IOException e) { + listener.onFailure(e); + } + }); } private void followRemoteIndex( @@ -124,14 +131,14 @@ public class TransportFollowIndexAction extends HandledTransportAction listener) { final ClusterState state = clusterService.state(); final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); - ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( + ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( client, clusterAlias, leaderIndex, listener::onFailure, - leaderIndexMetadata -> { + (leaderHistoryUUID, leaderIndexMetadata) -> { try { - start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, listener); + start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); } catch (final IOException e) { listener.onFailure(e); } @@ -153,18 +160,23 @@ public class TransportFollowIndexAction extends HandledTransportAction handler) throws IOException { MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null; - validate(request, leaderIndexMetadata, followIndexMetadata, mapperService); + validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService); final int numShards = followIndexMetadata.getNumberOfShards(); final AtomicInteger counter = new AtomicInteger(numShards); final AtomicReferenceArray responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); Map filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() .filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + for (int i = 0; i < numShards; i++) { final int shardId = i; String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; + String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndexMetadata); + String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId]; ShardFollowTask shardFollowTask = new ShardFollowTask( clusterNameAlias, @@ -177,6 +189,7 @@ public class TransportFollowIndexAction extends HandledTransportAction>() { @@ -224,6 +237,7 @@ public class TransportFollowIndexAction extends HandledTransportAction> WHITE_LISTED_SETTINGS; static { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index c0919f25fe3..f4291ddc8dd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -27,7 +27,9 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -116,7 +118,8 @@ public class ShardChangesIT extends ESIntegTestCase { long globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); assertThat(globalCheckPoint, equalTo(2L)); - ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardChangesAction.Request request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID); request.setFromSeqNo(0L); request.setMaxOperationCount(3); ShardChangesAction.Response response = client().execute(ShardChangesAction.INSTANCE, request).get(); @@ -141,7 +144,7 @@ public class ShardChangesIT extends ESIntegTestCase { globalCheckPoint = shardStats.getSeqNoStats().getGlobalCheckpoint(); assertThat(globalCheckPoint, equalTo(5L)); - request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId()); + request = new ShardChangesAction.Request(shardStats.getShardRouting().shardId(), historyUUID); request.setFromSeqNo(3L); request.setMaxOperationCount(3); response = client().execute(ShardChangesAction.INSTANCE, request).get(); @@ -357,16 +360,11 @@ public class ShardChangesIT extends ESIntegTestCase { final String leaderIndexSettings = getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); - - final String followerIndexSettings = - getIndexSettingsWithNestedMapping(1, between(0, 1), singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true")); - assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON)); - internalCluster().ensureAtLeastNumDataNodes(2); - ensureGreen("index1", "index2"); + ensureGreen("index1"); final FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); - client().execute(FollowIndexAction.INSTANCE, followRequest).get(); + client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get(); final int numDocs = randomIntBetween(2, 64); for (int i = 0; i < numDocs; i++) { @@ -409,13 +407,13 @@ public class ShardChangesIT extends ESIntegTestCase { assertAcked(client().admin().indices().prepareCreate("test-follower").get()); // Leader index does not exist. FollowIndexAction.Request followRequest1 = createFollowRequest("non-existent-leader", "test-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest1).actionGet()); // Follower index does not exist. FollowIndexAction.Request followRequest2 = createFollowRequest("non-test-leader", "non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest2).actionGet()); // Both indices do not exist. FollowIndexAction.Request followRequest3 = createFollowRequest("non-existent-leader", "non-existent-follower"); - expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); + expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); } @TestLogging("_root:DEBUG") diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java index 430e9cb48b1..b973fbac3ce 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesActionTests.java @@ -59,22 +59,27 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { int max = randomIntBetween(min, numWrites - 1); int size = max - min + 1; final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, - indexShard.getGlobalCheckpoint(), min, size, Long.MAX_VALUE); + indexShard.getGlobalCheckpoint(), min, size, indexShard.getHistoryUUID(), Long.MAX_VALUE); final List seenSeqNos = Arrays.stream(operations).map(Translog.Operation::seqNo).collect(Collectors.toList()); final List expectedSeqNos = LongStream.rangeClosed(min, max).boxed().collect(Collectors.toList()); assertThat(seenSeqNos, equalTo(expectedSeqNos)); } - // get operations for a range no operations exists: Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), - numWrites, numWrites + 1, Long.MAX_VALUE); + numWrites, numWrites + 1, indexShard.getHistoryUUID(), Long.MAX_VALUE); assertThat(operations.length, equalTo(0)); // get operations for a range some operations do not exist: operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), - numWrites - 10, numWrites + 10, Long.MAX_VALUE); + numWrites - 10, numWrites + 10, indexShard.getHistoryUUID(), Long.MAX_VALUE); assertThat(operations.length, equalTo(10)); + + // Unexpected history UUID: + Exception e = expectThrows(IllegalStateException.class, () -> ShardChangesAction.getOperations(indexShard, + indexShard.getGlobalCheckpoint(), 0, 10, "different-history-uuid", Long.MAX_VALUE)); + assertThat(e.getMessage(), equalTo("unexpected history uuid, expected [different-history-uuid], actual [" + + indexShard.getHistoryUUID() + "]")); } public void testGetOperationsWhenShardNotStarted() throws Exception { @@ -83,7 +88,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { ShardRouting shardRouting = TestShardRouting.newShardRouting("index", 0, "_node_id", true, ShardRoutingState.INITIALIZING); Mockito.when(indexShard.routingEntry()).thenReturn(shardRouting); expectThrows(IndexShardNotStartedException.class, () -> ShardChangesAction.getOperations(indexShard, - indexShard.getGlobalCheckpoint(), 0, 1, Long.MAX_VALUE)); + indexShard.getGlobalCheckpoint(), 0, 1, indexShard.getHistoryUUID(), Long.MAX_VALUE)); } public void testGetOperationsExceedByteLimit() throws Exception { @@ -100,7 +105,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { final IndexShard indexShard = indexService.getShard(0); final Translog.Operation[] operations = ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), - 0, 12, 256); + 0, 12, indexShard.getHistoryUUID(), 256); assertThat(operations.length, equalTo(12)); assertThat(operations[0].seqNo(), equalTo(0L)); assertThat(operations[1].seqNo(), equalTo(1L)); @@ -127,7 +132,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { final IndexShard indexShard = indexService.getShard(0); final Translog.Operation[] operations = - ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), 0, 1, 0); + ShardChangesAction.getOperations(indexShard, indexShard.getGlobalCheckpoint(), 0, 1, indexShard.getHistoryUUID(), 0); assertThat(operations.length, equalTo(1)); assertThat(operations[0].seqNo(), equalTo(0L)); } @@ -137,7 +142,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { final AtomicReference reference = new AtomicReference<>(); final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class); transportAction.execute( - new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0)), + new ShardChangesAction.Request(new ShardId(new Index("non-existent", "uuid"), 0), "uuid"), new ActionListener() { @Override public void onResponse(final ShardChangesAction.Response response) { @@ -162,7 +167,7 @@ public class ShardChangesActionTests extends ESSingleNodeTestCase { final AtomicReference reference = new AtomicReference<>(); final ShardChangesAction.TransportAction transportAction = node().injector().getInstance(ShardChangesAction.TransportAction.class); transportAction.execute( - new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards)), + new ShardChangesAction.Request(new ShardId(indexService.getMetaData().getIndex(), numberOfShards), "uuid"), new ActionListener() { @Override public void onResponse(final ShardChangesAction.Response response) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java index 19585da8851..2ea2086990b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesRequestTests.java @@ -15,7 +15,8 @@ public class ShardChangesRequestTests extends AbstractStreamableTestCase scheduler = (delay, task) -> { @@ -215,8 +225,16 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { byte[] source = "{}".getBytes(StandardCharsets.UTF_8); ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } - item.add(new TestResponse(null, mappingVersion, - new ShardChangesAction.Response(mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY)))); + item.add(new TestResponse( + null, + mappingVersion, + new ShardChangesAction.Response( + mappingVersion, + nextGlobalCheckPoint, + nextGlobalCheckPoint, + ops.toArray(EMPTY)) + ) + ); responses.put(prevGlobalCheckpoint, item); } else { // Simulates a leader shard copy not having all the operations the shard follow task thinks it has by @@ -232,8 +250,12 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { } // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind if (sometimes()) { - ShardChangesAction.Response response = - new ShardChangesAction.Response(mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY); + ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersion, + prevGlobalCheckpoint, + prevGlobalCheckpoint, + EMPTY + ); item.add(new TestResponse(null, mappingVersion, response)); } List ops = new ArrayList<>(); @@ -244,8 +266,12 @@ public class ShardFollowNodeTaskRandomTests extends ESTestCase { } // Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind: long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo; - ShardChangesAction.Response response = - new ShardChangesAction.Response(mappingVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY)); + ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersion, + localLeaderGCP, + localLeaderGCP, + ops.toArray(EMPTY) + ); item.add(new TestResponse(null, mappingVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index e25d95538b2..101b2580759 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -627,9 +627,20 @@ public class ShardFollowNodeTaskTests extends ESTestCase { int bufferWriteLimit, long maxBatchSizeInBytes) { AtomicBoolean stopped = new AtomicBoolean(false); - ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), maxBatchOperationCount, maxConcurrentReadBatches, maxBatchSizeInBytes, - maxConcurrentWriteBatches, bufferWriteLimit, TimeValue.ZERO, TimeValue.ZERO, Collections.emptyMap()); + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + maxBatchOperationCount, + maxConcurrentReadBatches, + maxBatchSizeInBytes, + maxConcurrentWriteBatches, + bufferWriteLimit, + TimeValue.ZERO, + TimeValue.ZERO, + "uuid", + Collections.emptyMap() + ); shardChangesRequests = new ArrayList<>(); bulkShardOperationRequests = new ArrayList<>(); @@ -690,12 +701,12 @@ public class ShardFollowNodeTaskTests extends ESTestCase { for (int i = 0; i < requestBatchSize; i++) { operations[i] = new Translog.NoOp(from + i, 0, "test"); } - final ShardChangesAction.Response response = - new ShardChangesAction.Response( - mappingVersions.poll(), - leaderGlobalCheckpoints.poll(), - maxSeqNos.poll(), - operations); + final ShardChangesAction.Response response = new ShardChangesAction.Response( + mappingVersions.poll(), + leaderGlobalCheckpoints.poll(), + maxSeqNos.poll(), + operations + ); handler.accept(response); } } @@ -727,7 +738,11 @@ public class ShardFollowNodeTaskTests extends ESTestCase { ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } return new ShardChangesAction.Response( - mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0])); + mappingVersion, + leaderGlobalCheckPoint, + leaderGlobalCheckPoint, + ops.toArray(new Translog.Operation[0]) + ); } void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 2cd024cb03c..9b04390a3a7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -38,11 +38,13 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTestCase { @@ -129,6 +131,43 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } } + public void testChangeHistoryUUID() throws Exception { + try (ReplicationGroup leaderGroup = createGroup(0); + ReplicationGroup followerGroup = createFollowGroup(0)) { + leaderGroup.startAll(); + int docCount = leaderGroup.appendDocs(randomInt(64)); + leaderGroup.assertAllEqual(docCount); + followerGroup.startAll(); + ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup); + final SeqNoStats leaderSeqNoStats = leaderGroup.getPrimary().seqNoStats(); + final SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); + shardFollowTask.start( + leaderSeqNoStats.getGlobalCheckpoint(), + leaderSeqNoStats.getMaxSeqNo(), + followerSeqNoStats.getGlobalCheckpoint(), + followerSeqNoStats.getMaxSeqNo()); + leaderGroup.syncGlobalCheckpoint(); + leaderGroup.assertAllEqual(docCount); + Set indexedDocIds = getShardDocUIDs(leaderGroup.getPrimary()); + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leaderGroup.getPrimary().getGlobalCheckpoint())); + followerGroup.assertAllEqual(indexedDocIds.size()); + }); + + String oldHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + leaderGroup.reinitPrimaryShard(); + leaderGroup.getPrimary().store().bootstrapNewHistory(); + recoverShardFromStore(leaderGroup.getPrimary()); + String newHistoryUUID = leaderGroup.getPrimary().getHistoryUUID(); + + assertBusy(() -> { + assertThat(shardFollowTask.isStopped(), is(true)); + assertThat(shardFollowTask.getFailure().getMessage(), equalTo("unexpected history uuid, expected [" + oldHistoryUUID + + "], actual [" + newHistoryUUID + "]")); + }); + } + } + @Override protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException { Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) @@ -159,12 +198,23 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } private ShardFollowNodeTask createShardFollowTask(ReplicationGroup leaderGroup, ReplicationGroup followerGroup) { - ShardFollowTask params = new ShardFollowTask(null, new ShardId("follow_index", "", 0), - new ShardId("leader_index", "", 0), between(1, 64), between(1, 8), Long.MAX_VALUE, between(1, 4), 10240, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10), Collections.emptyMap()); + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + between(1, 64), + between(1, 8), + Long.MAX_VALUE, + between(1, 4), 10240, + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10), + leaderGroup.getPrimary().getHistoryUUID(), + Collections.emptyMap() + ); BiConsumer scheduler = (delay, task) -> threadPool.schedule(delay, ThreadPool.Names.GENERIC, task); AtomicBoolean stopped = new AtomicBoolean(false); + AtomicReference failureHolder = new AtomicReference<>(); LongSet fetchOperations = new LongHashSet(); return new ShardFollowNodeTask( 1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), params, scheduler, System::nanoTime) { @@ -210,10 +260,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest try { final SeqNoStats seqNoStats = indexShard.seqNoStats(); Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, - maxOperationCount, params.getMaxBatchSizeInBytes()); + maxOperationCount, params.getRecordedLeaderIndexHistoryUUID(), params.getMaxBatchSizeInBytes()); // hard code mapping version; this is ok, as mapping updates are not tested here - final ShardChangesAction.Response response = - new ShardChangesAction.Response(1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), ops); + final ShardChangesAction.Response response = new ShardChangesAction.Response( + 1L, + seqNoStats.getGlobalCheckpoint(), + seqNoStats.getMaxSeqNo(), + ops + ); handler.accept(response); return; } catch (Exception e) { @@ -238,9 +292,14 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest @Override public void markAsFailed(Exception e) { + failureHolder.set(e); stopped.set(true); } + @Override + public Exception getFailure() { + return failureHolder.get(); + } }; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java index 300794a6c00..fa11ddf4bf9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskTests.java @@ -34,7 +34,9 @@ public class ShardFollowTaskTests extends AbstractSerializingTestCase CUSTOM_METADATA = + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "uuid"); + public void testValidation() throws IOException { FollowIndexAction.Request request = ShardChangesIT.createFollowRequest("index1", "index2"); + String[] UUIDs = new String[]{"uuid"}; { // should fail, because leader index does not exist - Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, null, null, null)); + Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, null, null, null, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not exist")); } { // should fail, because follow index does not exist - IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY); - Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, null, null)); + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + Exception e = expectThrows(IllegalArgumentException.class, + () -> validate(request, leaderIMD, null, null, null)); assertThat(e.getMessage(), equalTo("follow index [index2] does not exist")); } + { + // should fail because the recorded leader index history uuid is not equal to the leader actual index history uuid: + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + Map customMetaData = + singletonMap(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, "another-uuid"); + IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, customMetaData); + Exception e = expectThrows(IllegalArgumentException.class, + () -> validate(request, leaderIMD, followIMD, UUIDs, null)); + assertThat(e.getMessage(), equalTo("leader shard [index2][0] should reference [another-uuid] as history uuid but " + + "instead reference [uuid] as history uuid")); + } { // should fail because leader index does not have soft deletes enabled - IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY); - IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY); - Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, null)); + IndexMetaData leaderIMD = createIMD("index1", 5, Settings.EMPTY, emptyMap()); + IndexMetaData followIMD = createIMD("index2", 5, Settings.EMPTY, CUSTOM_METADATA); + Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader index [index1] does not have soft deletes enabled")); } { // should fail because the number of primary shards between leader and follow index are not equal IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); - IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY); - Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, null)); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); + IndexMetaData followIMD = createIMD("index2", 4, Settings.EMPTY, CUSTOM_METADATA); + Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader index primary shards [5] does not match with the number of shards of the follow index [4]")); } { // should fail, because leader index is closed IndexMetaData leaderIMD = createIMD("index1", State.CLOSE, "{}", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, "{}", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); - Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, null)); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), CUSTOM_METADATA); + Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("leader and follow index must be open")); } { // should fail, because leader has a field with the same name mapped as keyword and follower as text IndexMetaData leaderIMD = createIMD("index1", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"keyword\"}}}", 5, - Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, "{\"properties\": {\"field\": {\"type\": \"text\"}}}", 5, - Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build()); + Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); mapperService.updateMapping(null, followIMD); - Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, mapperService)); + Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(e.getMessage(), equalTo("mapper [field] of different type, current_type [text], merged_type [keyword]")); } { @@ -80,38 +100,38 @@ public class TransportFollowIndexActionTests extends ESTestCase { IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "whitespace").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); - Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, null)); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); + Exception e = expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, null)); assertThat(e.getMessage(), equalTo("the leader and follower index settings must be identical")); } { // should fail because the following index does not have the following_index settings IndexMetaData leaderIMD = createIMD("index1", 5, - Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); Settings followingIndexSettings = randomBoolean() ? Settings.EMPTY : Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), false).build(); - IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings); + IndexMetaData followIMD = createIMD("index2", 5, followingIndexSettings, CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followingIndexSettings, "index2"); mapperService.updateMapping(null, followIMD); IllegalArgumentException error = - expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, mapperService)); + expectThrows(IllegalArgumentException.class, () -> validate(request, leaderIMD, followIMD, UUIDs, mapperService)); assertThat(error.getMessage(), equalTo("the following index [index2] is not ready to follow; " + "the setting [index.xpack.ccr.following_index] must be enabled.")); } { // should succeed IndexMetaData leaderIMD = createIMD("index1", 5, Settings.builder() - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build()); + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", 5, Settings.builder() - .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build()); + .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), Settings.EMPTY, "index2"); mapperService.updateMapping(null, followIMD); - validate(request, leaderIMD, followIMD, mapperService); + validate(request, leaderIMD, followIMD, UUIDs, mapperService); } { // should succeed, index settings are identical @@ -119,15 +139,15 @@ public class TransportFollowIndexActionTests extends ESTestCase { IndexMetaData leaderIMD = createIMD("index1", State.OPEN, mapping, 5, Settings.builder() .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followIMD.getSettings(), "index2"); mapperService.updateMapping(null, followIMD); - validate(request, leaderIMD, followIMD, mapperService); + validate(request, leaderIMD, followIMD, UUIDs, mapperService); } { // should succeed despite whitelisted settings being different @@ -136,25 +156,32 @@ public class TransportFollowIndexActionTests extends ESTestCase { .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true") .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), emptyMap()); IndexMetaData followIMD = createIMD("index2", State.OPEN, mapping, 5, Settings.builder() .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s") .put("index.analysis.analyzer.my_analyzer.type", "custom") - .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build()); + .put("index.analysis.analyzer.my_analyzer.tokenizer", "standard").build(), CUSTOM_METADATA); MapperService mapperService = MapperTestUtils.newMapperService(xContentRegistry(), createTempDir(), followIMD.getSettings(), "index2"); mapperService.updateMapping(null, followIMD); - validate(request, leaderIMD, followIMD, mapperService); + validate(request, leaderIMD, followIMD, UUIDs, mapperService); } } - private static IndexMetaData createIMD(String index, int numberOfShards, Settings settings) throws IOException { - return createIMD(index, State.OPEN, "{\"properties\": {}}", numberOfShards, settings); + private static IndexMetaData createIMD(String index, + int numberOfShards, + Settings settings, + Map custom) throws IOException { + return createIMD(index, State.OPEN, "{\"properties\": {}}", numberOfShards, settings, custom); } - private static IndexMetaData createIMD(String index, State state, String mapping, int numberOfShards, - Settings settings) throws IOException { + private static IndexMetaData createIMD(String index, + State state, + String mapping, + int numberOfShards, + Settings settings, + Map custom) throws IOException { return IndexMetaData.builder(index) .settings(settings(Version.CURRENT).put(settings)) .numberOfShards(numberOfShards) @@ -162,6 +189,7 @@ public class TransportFollowIndexActionTests extends ESTestCase { .numberOfReplicas(0) .setRoutingNumShards(numberOfShards) .putMapping("_doc", mapping) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, custom) .build(); }