CCR: Replicate existing ops with old term on follower (#34412)

Since #34288, we might hit deadlock if the FollowTask has more fetchers
than writers. This can happen in the following scenario:

Suppose the leader has two operations [seq#0, seq#1]; the FollowTask has
two fetchers and one writer.

1. The FollowTask issues two concurrent fetch requests: {from_seq_no: 0,
num_ops:1} and {from_seq_no: 1, num_ops:1} to read seq#0 and seq#1
respectively.

2. The second request which fetches seq#1 completes before, and then it
triggers a write request containing only seq#1.

3. The primary of a follower fails after it has replicated seq#1 to
replicas.

4. Since the old primary did not respond, the FollowTask issues another
write request containing seq#1 (resend the previous write request).

5. The new primary has seq#1 already; thus it won't replicate seq#1 to
replicas but will wait for the global checkpoint to advance at least
seq#1.

The problem is that the FollowTask has only one writer and that writer
is waiting for seq#0 which won't be delivered until the writer completed.

This PR proposes to replicate existing operations with the old primary
term (instead of the current term) on the follower. In particular, when
the following primary detects that it has processed an process already,
it will look up the term of an existing operation with the same seq_no
in the Lucene index, then rewrite that operation with the old term
before replicating it to the following replicas. This approach is
wait-free but requires soft-deletes on the follower.

Relates #34288
This commit is contained in:
Nhat Nguyen 2018-10-19 13:56:00 -04:00 committed by GitHub
parent 94bde37bcf
commit bd92a28cfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 247 additions and 181 deletions

View File

@ -2427,9 +2427,7 @@ public class InternalEngine extends Engine {
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
// TODO: Should we defer the refresh until we really need it?
ensureOpen();
if (lastRefreshedCheckpoint() < toSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
@ -2539,6 +2537,15 @@ public class InternalEngine extends Engine {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}
/**
* Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint.
*/
protected final void refreshIfNeeded(String source, long requestingSeqNo) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
}
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
private long pendingCheckpoint;

View File

@ -49,15 +49,20 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
@ -65,6 +70,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
@ -72,6 +78,7 @@ import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
@ -307,6 +314,27 @@ public abstract class EngineTestCase extends ESTestCase {
mappingUpdate);
}
public static CheckedFunction<String, ParsedDocument, IOException> nestedParsedDocFactory() throws Exception {
final MapperService mapperService = createMapperService("type");
final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject()
.endObject().endObject());
final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping));
return docId -> {
final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value");
final int nestedValues = between(0, 3);
if (nestedValues > 0) {
XContentBuilder nestedField = source.startObject("nested_field");
for (int i = 0; i < nestedValues; i++) {
nestedField.field("field-" + i, "value-" + i);
}
source.endObject();
}
source.endObject();
return nestedMapper.parse(SourceToParse.source("test", "type", docId, BytesReference.bytes(source), XContentType.JSON));
};
}
/**
* Creates a tombstone document that only includes uid, seq#, term and version fields.
*/

View File

@ -31,7 +31,6 @@ import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineE
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
public class TransportBulkShardOperationsAction
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
@ -68,6 +67,41 @@ public class TransportBulkShardOperationsAction
request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
}
static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) {
final Translog.Operation operationWithPrimaryTerm;
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
operationWithPrimaryTerm = new Translog.Index(
index.type(),
index.id(),
index.seqNo(),
primaryTerm,
index.version(),
BytesReference.toBytes(index.source()),
index.routing(),
index.getAutoGeneratedIdTimestamp());
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
operationWithPrimaryTerm = new Translog.Delete(
delete.type(),
delete.id(),
delete.uid(),
delete.seqNo(),
primaryTerm,
delete.version());
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason());
break;
default:
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");
}
return operationWithPrimaryTerm;
}
// public for testing purposes only
public static CcrWritePrimaryResult shardOperationOnPrimary(
final ShardId shardId,
@ -81,49 +115,13 @@ public class TransportBulkShardOperationsAction
"], actual [" + primary.getHistoryUUID() + "], shard is likely restored from snapshot or force allocated");
}
final Function<Translog.Operation, Translog.Operation> rewriteWithTerm = operation -> {
final Translog.Operation operationWithPrimaryTerm;
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
operationWithPrimaryTerm = new Translog.Index(
index.type(),
index.id(),
index.seqNo(),
primary.getOperationPrimaryTerm(),
index.version(),
BytesReference.toBytes(index.source()),
index.routing(),
index.getAutoGeneratedIdTimestamp());
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
operationWithPrimaryTerm = new Translog.Delete(
delete.type(),
delete.id(),
delete.uid(),
delete.seqNo(),
primary.getOperationPrimaryTerm(),
delete.version());
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getOperationPrimaryTerm(), noOp.reason());
break;
default:
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");
}
return operationWithPrimaryTerm;
};
assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]";
primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
final List<Translog.Operation> appliedOperations = new ArrayList<>(sourceOperations.size());
Translog.Location location = null;
long waitingForGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
for (Translog.Operation sourceOp : sourceOperations) {
final Translog.Operation targetOp = rewriteWithTerm.apply(sourceOp);
final Translog.Operation targetOp = rewriteOperationWithPrimaryTerm(sourceOp, primary.getOperationPrimaryTerm());
final Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY);
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
assert result.getSeqNo() == targetOp.seqNo();
@ -131,23 +129,28 @@ public class TransportBulkShardOperationsAction
location = locationToSync(location, result.getTranslogLocation());
} else {
if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) {
// Skipped operations will be delivered to replicas via primary-replica resync or peer-recovery.
// The primary must not acknowledge this request until the global checkpoint is at least the highest
// seqno of all skipped operations (i.e., all skipped operations have been processed on every replica).
waitingForGlobalCheckpoint = SequenceNumbers.max(waitingForGlobalCheckpoint, targetOp.seqNo());
// The existing operations below the global checkpoint won't be replicated as they were processed
// in every replicas already. However, the existing operations above the global checkpoint will be
// replicated to replicas but with the existing primary term (not the current primary term) in order
// to guarantee the consistency between the primary and replicas, and between translog and Lucene index.
final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure();
assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo();
if (failure.getExistingPrimaryTerm().isPresent()) {
appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong()));
} else if (targetOp.seqNo() > primary.getGlobalCheckpoint()) {
assert false : "can't find primary_term for existing op=" + targetOp + " gcp=" + primary.getGlobalCheckpoint();
throw new IllegalStateException("can't find primary_term for existing op=" + targetOp +
" global_checkpoint=" + primary.getGlobalCheckpoint(), failure);
}
} else {
assert false : "Only already-processed error should happen; op=[" + targetOp + "] error=[" + result.getFailure() + "]";
throw ExceptionsHelper.convertToElastic(result.getFailure());
}
}
}
assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO :
"waiting global checkpoint is not assigned; waiting_gcp=" + waitingForGlobalCheckpoint +
" source_ops=" + sourceOperations.size() + " applied_ops=" + sourceOperations.size();
assert appliedOperations.size() == 0 || location != null;
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes);
return new CcrWritePrimaryResult(replicaRequest, location, primary, waitingForGlobalCheckpoint, logger);
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
}
@Override
@ -184,12 +187,8 @@ public class TransportBulkShardOperationsAction
* Custom write result to include global checkpoint after ops have been replicated.
*/
static final class CcrWritePrimaryResult extends WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> {
final long waitingForGlobalCheckpoint;
CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary,
long waitingForGlobalCheckpoint, Logger logger) {
CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) {
super(request, new BulkShardOperationsResponse(), location, null, primary, logger);
this.waitingForGlobalCheckpoint = waitingForGlobalCheckpoint;
}
@Override
@ -201,20 +200,8 @@ public class TransportBulkShardOperationsAction
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
listener.onResponse(response);
}, listener::onFailure);
if (waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO) {
primary.addGlobalCheckpointListener(waitingForGlobalCheckpoint, (gcp, e) -> {
if (e != null) {
listener.onFailure(e);
} else {
assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp;
super.respond(wrappedListener);
}
}, null);
} else {
super.respond(wrappedListener);
}
}
}

View File

@ -9,8 +9,27 @@ package org.elasticsearch.xpack.ccr.index.engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
import java.util.OptionalLong;
/**
* An exception represents that an operation was processed before on the {@link FollowingEngine} of the primary of a follower.
* The field {@code existingPrimaryTerm} is empty only if the operation is below the global checkpoint; otherwise it should be non-empty.
*/
public final class AlreadyProcessedFollowingEngineException extends VersionConflictEngineException {
AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo) {
super(shardId, "operation [{}] was processed before", null, seqNo);
private final long seqNo;
private final OptionalLong existingPrimaryTerm;
AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo, OptionalLong existingPrimaryTerm) {
super(shardId, "operation [{}] was processed before with term [{}]", null, seqNo, existingPrimaryTerm);
this.seqNo = seqNo;
this.existingPrimaryTerm = existingPrimaryTerm;
}
public long getSeqNo() {
return seqNo;
}
public OptionalLong getExistingPrimaryTerm() {
return existingPrimaryTerm;
}
}

View File

@ -5,14 +5,28 @@
*/
package org.elasticsearch.xpack.ccr.index.engine;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.ReaderUtil;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.xpack.ccr.CcrSettings;
import java.io.IOException;
import java.util.OptionalLong;
/**
* An engine implementation for following shards.
@ -62,13 +76,13 @@ public final class FollowingEngine extends InternalEngine {
/*
* The existing operation in this engine was probably assigned the term of the previous primary shard which is different
* from the term of the current operation. If the current operation arrives on replicas before the previous operation,
* then the Lucene content between the primary and replicas are not identical (primary terms are different). Since the
* existing operations are guaranteed to be replicated to replicas either via peer-recovery or primary-replica resync,
* we can safely skip this operation here and let the caller know the decision via AlreadyProcessedFollowingEngineException.
* The caller then waits for the global checkpoint to advance at least the seq_no of this operation to make sure that
* the existing operation was replicated to all replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary).
* then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely
* skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint
* but with the previous primary term (not the current term of the operation) in order to guarantee the consistency
* between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary).
*/
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, index.seqNo());
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo()));
return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm());
} else {
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
@ -88,7 +102,8 @@ public final class FollowingEngine extends InternalEngine {
preFlight(delete);
if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, delete.seqNo());
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
shardId, delete.seqNo(), lookupPrimaryTerm(delete.seqNo()));
return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false);
} else {
return planDeletionAsNonPrimary(delete);
@ -126,6 +141,46 @@ public final class FollowingEngine extends InternalEngine {
return true;
}
private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException {
refreshIfNeeded("lookup_primary_term", seqNo);
try (Searcher engineSearcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) {
// We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else
// branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the
// operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already.
if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) {
return OptionalLong.empty();
} else {
final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader());
final IndexSearcher searcher = new IndexSearcher(reader);
searcher.setQueryCache(null);
final Query query = new BooleanQuery.Builder()
.add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER)
// excludes the non-root nested documents which don't have primary_term.
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER)
.build();
final TopDocs topDocs = searcher.search(query, 1);
if (topDocs.scoreDocs.length == 1) {
final int docId = topDocs.scoreDocs[0].doc;
final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves()));
final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) {
assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]";
return OptionalLong.of(primaryTermDV.longValue());
}
}
assert false : "seq_no[" + seqNo + "] does not have primary_term, total_hits=[" + topDocs.totalHits + "]";
throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term (total_hits=" + topDocs.totalHits + ")");
}
} catch (IOException e) {
try {
maybeFailEngine("lookup_primary_term", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
/**
* Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine.
* This metric is not persisted, and started from 0 when the engine is opened.

View File

@ -271,7 +271,6 @@ public class IndexFollowingIT extends CCRIntegTestCase {
assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards);
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412")
public void testFollowIndexAndCloseNode() throws Exception {
getFollowerCluster().ensureAtLeastNumDataNodes(3);
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
@ -619,7 +618,6 @@ public class IndexFollowingIT extends CCRIntegTestCase {
assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L));
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412")
public void testFailOverOnFollower() throws Exception {
int numberOfReplicas = between(1, 2);
getFollowerCluster().startMasterOnlyNode();

View File

@ -233,7 +233,6 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
}
}
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412")
public void testRetryBulkShardOperations() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(between(0, 1));
ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) {
@ -345,7 +344,9 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
private ReplicationGroup createFollowGroup(int replicas) throws IOException {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB));
return createGroup(replicas, settingsBuilder.build());
}

View File

@ -7,18 +7,18 @@
package org.elasticsearch.xpack.ccr.action.bulk;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
@ -29,6 +29,9 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm;
import static org.hamcrest.Matchers.equalTo;
import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.CcrWritePrimaryResult;
@ -87,60 +90,11 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
closeShards(followerPrimary);
}
public void testPrimaryResultWaitForGlobalCheckpoint() throws Exception {
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build();
final IndexShard shard = newStartedShard(false, settings, new FollowingEngineFactory());
int numOps = between(1, 100);
for (int i = 0; i < numOps; i++) {
final String id = Integer.toString(i);
final Translog.Operation op;
if (randomBoolean()) {
op = new Translog.Index("_doc", id, i, primaryTerm, 0, SOURCE, null, -1);
} else if (randomBoolean()) {
shard.advanceMaxSeqNoOfUpdatesOrDeletes(i);
op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), i, primaryTerm, 0);
} else {
op = new Translog.NoOp(i, primaryTerm, "test");
}
shard.applyTranslogOperation(op, Engine.Operation.Origin.REPLICA);
}
BulkShardOperationsRequest request = new BulkShardOperationsRequest();
{
PlainActionFuture<BulkShardOperationsResponse> listener = new PlainActionFuture<>();
CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, -2, logger);
primaryResult.respond(listener);
assertThat("should return intermediately if waiting_global_checkpoint is not specified", listener.isDone(), equalTo(true));
assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo()));
}
{
PlainActionFuture<BulkShardOperationsResponse> listener = new PlainActionFuture<>();
long waitingForGlobalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint() + 1, shard.getLocalCheckpoint());
CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger);
primaryResult.respond(listener);
assertThat(listener.isDone(), equalTo(false));
expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1)));
shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), waitingForGlobalCheckpoint - 1), "test");
expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1)));
shard.updateGlobalCheckpointOnReplica(randomLongBetween(waitingForGlobalCheckpoint, shard.getLocalCheckpoint()), "test");
assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo()));
assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint()));
}
{
PlainActionFuture<BulkShardOperationsResponse> listener = new PlainActionFuture<>();
long waitingForGlobalCheckpoint = randomLongBetween(-1, shard.getGlobalCheckpoint());
CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger);
primaryResult.respond(listener);
assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo()));
assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint()));
}
closeShards(shard);
}
public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception {
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build();
final IndexShard primary = newStartedShard(true, settings, new FollowingEngineFactory());
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final IndexShard oldPrimary = newStartedShard(true, settings, new FollowingEngineFactory());
final long oldPrimaryTerm = oldPrimary.getOperationPrimaryTerm();
long seqno = 0;
List<Translog.Operation> firstBulk = new ArrayList<>();
List<Translog.Operation> secondBulk = new ArrayList<>();
@ -157,46 +111,41 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
} else {
secondBulk.add(op);
}
if (rarely()) {
oldPrimary.refresh("test");
}
if (rarely()) {
oldPrimary.flush(new FlushRequest());
}
}
Randomness.shuffle(firstBulk);
Randomness.shuffle(secondBulk);
primary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno);
final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(),
primary.getHistoryUUID(), firstBulk, seqno, primary, logger);
oldPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno);
final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(oldPrimary.shardId(),
oldPrimary.getHistoryUUID(), firstBulk, seqno, oldPrimary, logger);
assertThat(fullResult.replicaRequest().getOperations(),
equalTo(rewriteWithPrimaryTerm(firstBulk, primary.getOperationPrimaryTerm())));
assertThat(fullResult.waitingForGlobalCheckpoint, equalTo(-2L));
// This bulk includes some operations from the first bulk. These operations should not be included in the result.
equalTo(firstBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)).collect(Collectors.toList())));
primaryTerm = randomLongBetween(primaryTerm, primaryTerm + 10);
final IndexShard newPrimary = reinitShard(oldPrimary);
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newPrimary.markAsRecovering("store", new RecoveryState(newPrimary.routingEntry(), localNode, null));
assertTrue(newPrimary.recoverFromStore());
IndexShardTestCase.updateRoutingEntry(newPrimary, newPrimary.routingEntry().moveToStarted());
newPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno);
// The second bulk includes some operations from the first bulk which were processed already;
// only a subset of these operations will be included the result but with the old primary term.
final List<Translog.Operation> existingOps = randomSubsetOf(firstBulk);
final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(),
primary.getHistoryUUID(), Stream.concat(existingOps.stream(), secondBulk.stream()).collect(Collectors.toList()),
seqno, primary, logger);
assertThat(partialResult.replicaRequest().getOperations(),
equalTo(rewriteWithPrimaryTerm(secondBulk, primary.getOperationPrimaryTerm())));
assertThat(partialResult.waitingForGlobalCheckpoint,
equalTo(existingOps.stream().mapToLong(Translog.Operation::seqNo).max().orElse(-2L)));
final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(newPrimary.shardId(),
newPrimary.getHistoryUUID(), Stream.concat(secondBulk.stream(), existingOps.stream()).collect(Collectors.toList()),
seqno, newPrimary, logger);
final long newPrimaryTerm = newPrimary.getOperationPrimaryTerm();
final long globalCheckpoint = newPrimary.getGlobalCheckpoint();
final List<Translog.Operation> appliedOperations = Stream.concat(
secondBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, newPrimaryTerm)),
existingOps.stream().filter(op -> op.seqNo() > globalCheckpoint).map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm))
).collect(Collectors.toList());
closeShards(primary);
}
private List<Translog.Operation> rewriteWithPrimaryTerm(List<Translog.Operation> sourceOperations, long primaryTerm) {
return sourceOperations.stream().map(op -> {
switch (op.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) op;
return new Translog.Index(index.type(), index.id(), index.seqNo(), primaryTerm,
index.version(), BytesReference.toBytes(index.source()), index.routing(), index.getAutoGeneratedIdTimestamp());
case DELETE:
final Translog.Delete delete = (Translog.Delete) op;
return new Translog.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, delete.version());
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) op;
return new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason());
default:
throw new IllegalStateException("unexpected operation type [" + op.opType() + "]");
}
}).collect(Collectors.toList());
assertThat(partialResult.replicaRequest().getOperations(), equalTo(appliedOperations));
closeShards(newPrimary);
}
}

View File

@ -14,6 +14,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -46,8 +47,10 @@ import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@ -67,6 +70,7 @@ public class FollowingEngineTests extends ESTestCase {
private Index index;
private ShardId shardId;
private AtomicLong primaryTerm = new AtomicLong();
private AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
public void setUp() throws Exception {
super.setUp();
@ -260,7 +264,7 @@ public class FollowingEngineTests extends ESTestCase {
Collections.emptyList(),
null,
new NoneCircuitBreakerService(),
() -> SequenceNumbers.NO_OPS_PERFORMED,
globalCheckpoint::longValue,
() -> primaryTerm.get(),
EngineTestCase.tombstoneDocSupplier()
);
@ -555,13 +559,16 @@ public class FollowingEngineTests extends ESTestCase {
public void testProcessOnceOnPrimary() throws Exception {
final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true).build();
.put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
final CheckedFunction<String, ParsedDocument, IOException> nestedDocFactory = EngineTestCase.nestedParsedDocFactory();
int numOps = between(10, 100);
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null);
String docId = Integer.toString(between(1, 100));
ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));
@ -571,24 +578,39 @@ public class FollowingEngineTests extends ESTestCase {
}
}
Randomness.shuffle(operations);
final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE);
primaryTerm.set(oldTerm);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(operations.size() - 1L);
final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE);
final Map<Long,Long> operationWithTerms = new HashMap<>();
for (Engine.Operation op : operations) {
Engine.Result result = applyOperation(followingEngine, op, oldTerm, randomFrom(Engine.Operation.Origin.values()));
long term = randomLongBetween(1, oldTerm);
Engine.Result result = applyOperation(followingEngine, op, term, randomFrom(Engine.Operation.Origin.values()));
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
operationWithTerms.put(op.seqNo(), term);
if (rarely()) {
followingEngine.refresh("test");
}
}
// Primary should reject duplicates
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), followingEngine.getLocalCheckpoint()));
final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE);
for (Engine.Operation op : operations) {
Engine.Result result = applyOperation(followingEngine, op, newTerm, Engine.Operation.Origin.PRIMARY);
assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE));
assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class));
AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure();
if (op.seqNo() <= globalCheckpoint.get()) {
assertThat("should not look-up term for operations at most the global checkpoint",
failure.getExistingPrimaryTerm().isPresent(), equalTo(false));
} else {
assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo())));
}
}
for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) {
assertThat(docId.getPrimaryTerm(), equalTo(oldTerm));
assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo())));
}
// Replica should accept duplicates
primaryTerm.set(newTerm);
@ -600,7 +622,7 @@ public class FollowingEngineTests extends ESTestCase {
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
}
for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) {
assertThat(docId.getPrimaryTerm(), equalTo(oldTerm));
assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo())));
}
}
}