From bd92a28cfc664b9e4b1407978fe14744d445ba3d Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 19 Oct 2018 13:56:00 -0400 Subject: [PATCH] CCR: Replicate existing ops with old term on follower (#34412) Since #34288, we might hit deadlock if the FollowTask has more fetchers than writers. This can happen in the following scenario: Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has two fetchers and one writer. 1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0, num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1 respectively. 2. The second request which fetches seq#1 completes before, and then it triggers a write request containing only seq#1. 3. The primary of a follower fails after it has replicated seq#1 to replicas. 4. Since the old primary did not respond, the FollowTask issues another write request containing seq#1 (resend the previous write request). 5. The new primary has seq#1 already; thus it won't replicate seq#1 to replicas but will wait for the global checkpoint to advance at least seq#1. The problem is that the FollowTask has only one writer and that writer is waiting for seq#0 which won't be delivered until the writer completed. This PR proposes to replicate existing operations with the old primary term (instead of the current term) on the follower. In particular, when the following primary detects that it has processed an process already, it will look up the term of an existing operation with the same seq_no in the Lucene index, then rewrite that operation with the old term before replicating it to the following replicas. This approach is wait-free but requires soft-deletes on the follower. Relates #34288 --- .../index/engine/InternalEngine.java | 13 +- .../index/engine/EngineTestCase.java | 28 ++++ .../TransportBulkShardOperationsAction.java | 117 +++++++-------- ...eadyProcessedFollowingEngineException.java | 23 ++- .../ccr/index/engine/FollowingEngine.java | 69 ++++++++- .../xpack/ccr/IndexFollowingIT.java | 2 - .../ShardFollowTaskReplicationTests.java | 5 +- .../action/bulk/BulkShardOperationsTests.java | 135 ++++++------------ .../index/engine/FollowingEngineTests.java | 36 ++++- 9 files changed, 247 insertions(+), 181 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 2930fce4c02..2c3988fc240 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2427,9 +2427,7 @@ public class InternalEngine extends Engine { long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { // TODO: Should we defer the refresh until we really need it? ensureOpen(); - if (lastRefreshedCheckpoint() < toSeqNo) { - refresh(source, SearcherScope.INTERNAL); - } + refreshIfNeeded(source, toSeqNo); Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); try { LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot( @@ -2539,6 +2537,15 @@ public class InternalEngine extends Engine { return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); } + /** + * Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint. + */ + protected final void refreshIfNeeded(String source, long requestingSeqNo) { + if (lastRefreshedCheckpoint() < requestingSeqNo) { + refresh(source, SearcherScope.INTERNAL); + } + } + private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener { final AtomicLong refreshedCheckpoint; private long pendingCheckpoint; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index bb1efd69973..3e563e6d538 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -49,15 +49,20 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; @@ -65,6 +70,7 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -72,6 +78,7 @@ import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -307,6 +314,27 @@ public abstract class EngineTestCase extends ESTestCase { mappingUpdate); } + public static CheckedFunction nestedParsedDocFactory() throws Exception { + final MapperService mapperService = createMapperService("type"); + final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject() + .endObject().endObject()); + final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping)); + return docId -> { + final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value"); + final int nestedValues = between(0, 3); + if (nestedValues > 0) { + XContentBuilder nestedField = source.startObject("nested_field"); + for (int i = 0; i < nestedValues; i++) { + nestedField.field("field-" + i, "value-" + i); + } + source.endObject(); + } + source.endObject(); + return nestedMapper.parse(SourceToParse.source("test", "type", docId, BytesReference.bytes(source), XContentType.JSON)); + }; + } + /** * Creates a tombstone document that only includes uid, seq#, term and version fields. */ diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 6d5df143eea..4a4b4648776 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -31,7 +31,6 @@ import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineE import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.function.Function; public class TransportBulkShardOperationsAction extends TransportWriteAction { @@ -68,6 +67,41 @@ public class TransportBulkShardOperationsAction request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); } + static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { + final Translog.Operation operationWithPrimaryTerm; + switch (operation.opType()) { + case INDEX: + final Translog.Index index = (Translog.Index) operation; + operationWithPrimaryTerm = new Translog.Index( + index.type(), + index.id(), + index.seqNo(), + primaryTerm, + index.version(), + BytesReference.toBytes(index.source()), + index.routing(), + index.getAutoGeneratedIdTimestamp()); + break; + case DELETE: + final Translog.Delete delete = (Translog.Delete) operation; + operationWithPrimaryTerm = new Translog.Delete( + delete.type(), + delete.id(), + delete.uid(), + delete.seqNo(), + primaryTerm, + delete.version()); + break; + case NO_OP: + final Translog.NoOp noOp = (Translog.NoOp) operation; + operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason()); + break; + default: + throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]"); + } + return operationWithPrimaryTerm; + } + // public for testing purposes only public static CcrWritePrimaryResult shardOperationOnPrimary( final ShardId shardId, @@ -81,49 +115,13 @@ public class TransportBulkShardOperationsAction "], actual [" + primary.getHistoryUUID() + "], shard is likely restored from snapshot or force allocated"); } - final Function rewriteWithTerm = operation -> { - final Translog.Operation operationWithPrimaryTerm; - switch (operation.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) operation; - operationWithPrimaryTerm = new Translog.Index( - index.type(), - index.id(), - index.seqNo(), - primary.getOperationPrimaryTerm(), - index.version(), - BytesReference.toBytes(index.source()), - index.routing(), - index.getAutoGeneratedIdTimestamp()); - break; - case DELETE: - final Translog.Delete delete = (Translog.Delete) operation; - operationWithPrimaryTerm = new Translog.Delete( - delete.type(), - delete.id(), - delete.uid(), - delete.seqNo(), - primary.getOperationPrimaryTerm(), - delete.version()); - break; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) operation; - operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getOperationPrimaryTerm(), noOp.reason()); - break; - default: - throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]"); - } - return operationWithPrimaryTerm; - }; - assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]"; primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); final List appliedOperations = new ArrayList<>(sourceOperations.size()); Translog.Location location = null; - long waitingForGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; for (Translog.Operation sourceOp : sourceOperations) { - final Translog.Operation targetOp = rewriteWithTerm.apply(sourceOp); + final Translog.Operation targetOp = rewriteOperationWithPrimaryTerm(sourceOp, primary.getOperationPrimaryTerm()); final Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY); if (result.getResultType() == Engine.Result.Type.SUCCESS) { assert result.getSeqNo() == targetOp.seqNo(); @@ -131,23 +129,28 @@ public class TransportBulkShardOperationsAction location = locationToSync(location, result.getTranslogLocation()); } else { if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) { - // Skipped operations will be delivered to replicas via primary-replica resync or peer-recovery. - // The primary must not acknowledge this request until the global checkpoint is at least the highest - // seqno of all skipped operations (i.e., all skipped operations have been processed on every replica). - waitingForGlobalCheckpoint = SequenceNumbers.max(waitingForGlobalCheckpoint, targetOp.seqNo()); + // The existing operations below the global checkpoint won't be replicated as they were processed + // in every replicas already. However, the existing operations above the global checkpoint will be + // replicated to replicas but with the existing primary term (not the current primary term) in order + // to guarantee the consistency between the primary and replicas, and between translog and Lucene index. + final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); + assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo(); + if (failure.getExistingPrimaryTerm().isPresent()) { + appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); + } else if (targetOp.seqNo() > primary.getGlobalCheckpoint()) { + assert false : "can't find primary_term for existing op=" + targetOp + " gcp=" + primary.getGlobalCheckpoint(); + throw new IllegalStateException("can't find primary_term for existing op=" + targetOp + + " global_checkpoint=" + primary.getGlobalCheckpoint(), failure); + } } else { assert false : "Only already-processed error should happen; op=[" + targetOp + "] error=[" + result.getFailure() + "]"; throw ExceptionsHelper.convertToElastic(result.getFailure()); } } } - assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO : - "waiting global checkpoint is not assigned; waiting_gcp=" + waitingForGlobalCheckpoint + - " source_ops=" + sourceOperations.size() + " applied_ops=" + sourceOperations.size(); - assert appliedOperations.size() == 0 || location != null; final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest( shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes); - return new CcrWritePrimaryResult(replicaRequest, location, primary, waitingForGlobalCheckpoint, logger); + return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); } @Override @@ -184,12 +187,8 @@ public class TransportBulkShardOperationsAction * Custom write result to include global checkpoint after ops have been replicated. */ static final class CcrWritePrimaryResult extends WritePrimaryResult { - final long waitingForGlobalCheckpoint; - - CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, - long waitingForGlobalCheckpoint, Logger logger) { + CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) { super(request, new BulkShardOperationsResponse(), location, null, primary, logger); - this.waitingForGlobalCheckpoint = waitingForGlobalCheckpoint; } @Override @@ -201,19 +200,7 @@ public class TransportBulkShardOperationsAction response.setMaxSeqNo(seqNoStats.getMaxSeqNo()); listener.onResponse(response); }, listener::onFailure); - - if (waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO) { - primary.addGlobalCheckpointListener(waitingForGlobalCheckpoint, (gcp, e) -> { - if (e != null) { - listener.onFailure(e); - } else { - assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp; - super.respond(wrappedListener); - } - }, null); - } else { - super.respond(wrappedListener); - } + super.respond(wrappedListener); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java index 9e19c93b286..3033ba31c82 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java @@ -9,8 +9,27 @@ package org.elasticsearch.xpack.ccr.index.engine; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.ShardId; +import java.util.OptionalLong; + +/** + * An exception represents that an operation was processed before on the {@link FollowingEngine} of the primary of a follower. + * The field {@code existingPrimaryTerm} is empty only if the operation is below the global checkpoint; otherwise it should be non-empty. + */ public final class AlreadyProcessedFollowingEngineException extends VersionConflictEngineException { - AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo) { - super(shardId, "operation [{}] was processed before", null, seqNo); + private final long seqNo; + private final OptionalLong existingPrimaryTerm; + + AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo, OptionalLong existingPrimaryTerm) { + super(shardId, "operation [{}] was processed before with term [{}]", null, seqNo, existingPrimaryTerm); + this.seqNo = seqNo; + this.existingPrimaryTerm = existingPrimaryTerm; + } + + public long getSeqNo() { + return seqNo; + } + + public OptionalLong getExistingPrimaryTerm() { + return existingPrimaryTerm; } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 8a413ce4980..84aa141c80d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -5,14 +5,28 @@ */ package org.elasticsearch.xpack.ccr.index.engine; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.ReaderUtil; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.DocValuesFieldExistsQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; +import java.util.OptionalLong; /** * An engine implementation for following shards. @@ -62,13 +76,13 @@ public final class FollowingEngine extends InternalEngine { /* * The existing operation in this engine was probably assigned the term of the previous primary shard which is different * from the term of the current operation. If the current operation arrives on replicas before the previous operation, - * then the Lucene content between the primary and replicas are not identical (primary terms are different). Since the - * existing operations are guaranteed to be replicated to replicas either via peer-recovery or primary-replica resync, - * we can safely skip this operation here and let the caller know the decision via AlreadyProcessedFollowingEngineException. - * The caller then waits for the global checkpoint to advance at least the seq_no of this operation to make sure that - * the existing operation was replicated to all replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). + * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely + * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint + * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency + * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). */ - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, index.seqNo()); + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); } else { return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); @@ -88,7 +102,8 @@ public final class FollowingEngine extends InternalEngine { preFlight(delete); if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, delete.seqNo()); + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, delete.seqNo(), lookupPrimaryTerm(delete.seqNo())); return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false); } else { return planDeletionAsNonPrimary(delete); @@ -126,6 +141,46 @@ public final class FollowingEngine extends InternalEngine { return true; } + private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { + refreshIfNeeded("lookup_primary_term", seqNo); + try (Searcher engineSearcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) { + // We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else + // branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the + // operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already. + if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) { + return OptionalLong.empty(); + } else { + final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); + final IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + final Query query = new BooleanQuery.Builder() + .add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER) + // excludes the non-root nested documents which don't have primary_term. + .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER) + .build(); + final TopDocs topDocs = searcher.search(query, 1); + if (topDocs.scoreDocs.length == 1) { + final int docId = topDocs.scoreDocs[0].doc; + final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves())); + final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) { + assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]"; + return OptionalLong.of(primaryTermDV.longValue()); + } + } + assert false : "seq_no[" + seqNo + "] does not have primary_term, total_hits=[" + topDocs.totalHits + "]"; + throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term (total_hits=" + topDocs.totalHits + ")"); + } + } catch (IOException e) { + try { + maybeFailEngine("lookup_primary_term", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; + } + } + /** * Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine. * This metric is not persisted, and started from 0 when the engine is opened. diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 8a1ad645c36..2d0b9e80934 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -271,7 +271,6 @@ public class IndexFollowingIT extends CCRIntegTestCase { assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testFollowIndexAndCloseNode() throws Exception { getFollowerCluster().ensureAtLeastNumDataNodes(3); String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); @@ -619,7 +618,6 @@ public class IndexFollowingIT extends CCRIntegTestCase { assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testFailOverOnFollower() throws Exception { int numberOfReplicas = between(1, 2); getFollowerCluster().startMasterOnlyNode(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index ad9aebade8f..be5e2198554 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -233,7 +233,6 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testRetryBulkShardOperations() throws Exception { try (ReplicationGroup leaderGroup = createGroup(between(0, 1)); ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) { @@ -345,7 +344,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest private ReplicationGroup createFollowGroup(int replicas) throws IOException { Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)); return createGroup(replicas, settingsBuilder.build()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index dfacb96c31c..283cf6bf42c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -7,18 +7,18 @@ package org.elasticsearch.xpack.ccr.action.bulk; import org.apache.lucene.index.Term; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; @@ -29,6 +29,9 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm; import static org.hamcrest.Matchers.equalTo; import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.CcrWritePrimaryResult; @@ -87,60 +90,11 @@ public class BulkShardOperationsTests extends IndexShardTestCase { closeShards(followerPrimary); } - public void testPrimaryResultWaitForGlobalCheckpoint() throws Exception { - final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); - final IndexShard shard = newStartedShard(false, settings, new FollowingEngineFactory()); - int numOps = between(1, 100); - for (int i = 0; i < numOps; i++) { - final String id = Integer.toString(i); - final Translog.Operation op; - if (randomBoolean()) { - op = new Translog.Index("_doc", id, i, primaryTerm, 0, SOURCE, null, -1); - } else if (randomBoolean()) { - shard.advanceMaxSeqNoOfUpdatesOrDeletes(i); - op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), i, primaryTerm, 0); - } else { - op = new Translog.NoOp(i, primaryTerm, "test"); - } - shard.applyTranslogOperation(op, Engine.Operation.Origin.REPLICA); - } - BulkShardOperationsRequest request = new BulkShardOperationsRequest(); - { - PlainActionFuture listener = new PlainActionFuture<>(); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, -2, logger); - primaryResult.respond(listener); - assertThat("should return intermediately if waiting_global_checkpoint is not specified", listener.isDone(), equalTo(true)); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - } - { - PlainActionFuture listener = new PlainActionFuture<>(); - long waitingForGlobalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint() + 1, shard.getLocalCheckpoint()); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); - primaryResult.respond(listener); - assertThat(listener.isDone(), equalTo(false)); - expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); - - shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), waitingForGlobalCheckpoint - 1), "test"); - expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); - - shard.updateGlobalCheckpointOnReplica(randomLongBetween(waitingForGlobalCheckpoint, shard.getLocalCheckpoint()), "test"); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint())); - } - { - PlainActionFuture listener = new PlainActionFuture<>(); - long waitingForGlobalCheckpoint = randomLongBetween(-1, shard.getGlobalCheckpoint()); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); - primaryResult.respond(listener); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint())); - } - closeShards(shard); - } - public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception { - final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); - final IndexShard primary = newStartedShard(true, settings, new FollowingEngineFactory()); + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + final IndexShard oldPrimary = newStartedShard(true, settings, new FollowingEngineFactory()); + final long oldPrimaryTerm = oldPrimary.getOperationPrimaryTerm(); long seqno = 0; List firstBulk = new ArrayList<>(); List secondBulk = new ArrayList<>(); @@ -157,46 +111,41 @@ public class BulkShardOperationsTests extends IndexShardTestCase { } else { secondBulk.add(op); } + if (rarely()) { + oldPrimary.refresh("test"); + } + if (rarely()) { + oldPrimary.flush(new FlushRequest()); + } } Randomness.shuffle(firstBulk); Randomness.shuffle(secondBulk); - primary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); - - final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), - primary.getHistoryUUID(), firstBulk, seqno, primary, logger); + oldPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); + final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(oldPrimary.shardId(), + oldPrimary.getHistoryUUID(), firstBulk, seqno, oldPrimary, logger); assertThat(fullResult.replicaRequest().getOperations(), - equalTo(rewriteWithPrimaryTerm(firstBulk, primary.getOperationPrimaryTerm()))); - assertThat(fullResult.waitingForGlobalCheckpoint, equalTo(-2L)); - - // This bulk includes some operations from the first bulk. These operations should not be included in the result. + equalTo(firstBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)).collect(Collectors.toList()))); + primaryTerm = randomLongBetween(primaryTerm, primaryTerm + 10); + final IndexShard newPrimary = reinitShard(oldPrimary); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newPrimary.markAsRecovering("store", new RecoveryState(newPrimary.routingEntry(), localNode, null)); + assertTrue(newPrimary.recoverFromStore()); + IndexShardTestCase.updateRoutingEntry(newPrimary, newPrimary.routingEntry().moveToStarted()); + newPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); + // The second bulk includes some operations from the first bulk which were processed already; + // only a subset of these operations will be included the result but with the old primary term. final List existingOps = randomSubsetOf(firstBulk); - final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), - primary.getHistoryUUID(), Stream.concat(existingOps.stream(), secondBulk.stream()).collect(Collectors.toList()), - seqno, primary, logger); - assertThat(partialResult.replicaRequest().getOperations(), - equalTo(rewriteWithPrimaryTerm(secondBulk, primary.getOperationPrimaryTerm()))); - assertThat(partialResult.waitingForGlobalCheckpoint, - equalTo(existingOps.stream().mapToLong(Translog.Operation::seqNo).max().orElse(-2L))); + final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(newPrimary.shardId(), + newPrimary.getHistoryUUID(), Stream.concat(secondBulk.stream(), existingOps.stream()).collect(Collectors.toList()), + seqno, newPrimary, logger); + final long newPrimaryTerm = newPrimary.getOperationPrimaryTerm(); + final long globalCheckpoint = newPrimary.getGlobalCheckpoint(); + final List appliedOperations = Stream.concat( + secondBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, newPrimaryTerm)), + existingOps.stream().filter(op -> op.seqNo() > globalCheckpoint).map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)) + ).collect(Collectors.toList()); - closeShards(primary); - } - - private List rewriteWithPrimaryTerm(List sourceOperations, long primaryTerm) { - return sourceOperations.stream().map(op -> { - switch (op.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) op; - return new Translog.Index(index.type(), index.id(), index.seqNo(), primaryTerm, - index.version(), BytesReference.toBytes(index.source()), index.routing(), index.getAutoGeneratedIdTimestamp()); - case DELETE: - final Translog.Delete delete = (Translog.Delete) op; - return new Translog.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, delete.version()); - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) op; - return new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason()); - default: - throw new IllegalStateException("unexpected operation type [" + op.opType() + "]"); - } - }).collect(Collectors.toList()); + assertThat(partialResult.replicaRequest().getOperations(), equalTo(appliedOperations)); + closeShards(newPrimary); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index ec59e4c5b1d..5d27c786ad4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -46,8 +47,10 @@ import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,6 +70,7 @@ public class FollowingEngineTests extends ESTestCase { private Index index; private ShardId shardId; private AtomicLong primaryTerm = new AtomicLong(); + private AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); public void setUp() throws Exception { super.setUp(); @@ -260,7 +264,7 @@ public class FollowingEngineTests extends ESTestCase { Collections.emptyList(), null, new NoneCircuitBreakerService(), - () -> SequenceNumbers.NO_OPS_PERFORMED, + globalCheckpoint::longValue, () -> primaryTerm.get(), EngineTestCase.tombstoneDocSupplier() ); @@ -555,13 +559,16 @@ public class FollowingEngineTests extends ESTestCase { public void testProcessOnceOnPrimary() throws Exception { final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) - .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true).build(); + .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + final CheckedFunction nestedDocFactory = EngineTestCase.nestedParsedDocFactory(); int numOps = between(10, 100); List operations = new ArrayList<>(numOps); for (int i = 0; i < numOps; i++) { - ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null); + String docId = Integer.toString(between(1, 100)); + ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId); if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true)); @@ -571,24 +578,39 @@ public class FollowingEngineTests extends ESTestCase { } } Randomness.shuffle(operations); + final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); + primaryTerm.set(oldTerm); try (Store store = createStore(shardId, indexSettings, newDirectory())) { final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(operations.size() - 1L); - final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); + final Map operationWithTerms = new HashMap<>(); for (Engine.Operation op : operations) { - Engine.Result result = applyOperation(followingEngine, op, oldTerm, randomFrom(Engine.Operation.Origin.values())); + long term = randomLongBetween(1, oldTerm); + Engine.Result result = applyOperation(followingEngine, op, term, randomFrom(Engine.Operation.Origin.values())); assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); + operationWithTerms.put(op.seqNo(), term); + if (rarely()) { + followingEngine.refresh("test"); + } } // Primary should reject duplicates + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), followingEngine.getLocalCheckpoint())); final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); for (Engine.Operation op : operations) { Engine.Result result = applyOperation(followingEngine, op, newTerm, Engine.Operation.Origin.PRIMARY); assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class)); + AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); + if (op.seqNo() <= globalCheckpoint.get()) { + assertThat("should not look-up term for operations at most the global checkpoint", + failure.getExistingPrimaryTerm().isPresent(), equalTo(false)); + } else { + assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo()))); + } } for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { - assertThat(docId.getPrimaryTerm(), equalTo(oldTerm)); + assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo()))); } // Replica should accept duplicates primaryTerm.set(newTerm); @@ -600,7 +622,7 @@ public class FollowingEngineTests extends ESTestCase { assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); } for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { - assertThat(docId.getPrimaryTerm(), equalTo(oldTerm)); + assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo()))); } } }