CCR: Following primary should process operations once (#34288)

Today we rewrite the operations from the leader with the term of the
following primary because the follower should own its history. The
problem is that a newly promoted primary may re-assign its term to
operations which were replicated to replicas before by the previous
primary. If this happens, some operations with the same seq_no may be
assigned different terms. This is not good for the future optimistic
locking using a combination of seqno and term.

This change ensures that the primary of a follower only processes an
operation if that operation was not processed before. The skipped
operations are guaranteed to be delivered to replicas via either
primary-replica resync or peer-recovery. However, the primary must not
acknowledge until the global checkpoint is at least the highest seqno of
all skipped ops (i.e., they all have been processed on every replica).

Relates #31751
Relates #31113
This commit is contained in:
Nhat Nguyen 2018-10-10 15:39:57 -04:00 committed by GitHub
parent 4270085360
commit 33791ac27c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 481 additions and 50 deletions

View File

@ -1086,7 +1086,7 @@ public class InternalEngine extends Engine {
return new IndexingStrategy(true, false, true, false, seqNoForIndexing, versionForIndexing, null);
}
static IndexingStrategy skipDueToVersionConflict(
public static IndexingStrategy skipDueToVersionConflict(
VersionConflictEngineException e, boolean currentNotFoundOrDeleted, long currentVersion, long term) {
final IndexResult result = new IndexResult(e, currentVersion, term);
return new IndexingStrategy(
@ -1343,7 +1343,7 @@ public class InternalEngine extends Engine {
Optional.empty() : Optional.of(earlyResultOnPreflightError);
}
static DeletionStrategy skipDueToVersionConflict(
public static DeletionStrategy skipDueToVersionConflict(
VersionConflictEngineException e, long currentVersion, long term, boolean currentlyDeleted) {
final long unassignedSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
final DeleteResult deleteResult = new DeleteResult(e, currentVersion, term, unassignedSeqNo, currentlyDeleted == false);

View File

@ -791,15 +791,14 @@ public abstract class EngineTestCase extends ESTestCase {
Bits liveDocs = reader.getLiveDocs();
for (int i = 0; i < reader.maxDoc(); i++) {
if (liveDocs == null || liveDocs.get(i)) {
if (primaryTermDocValues.advanceExact(i) == false) {
// We have to skip non-root docs because its _id field is not stored (indexed only).
continue;
}
final long primaryTerm = primaryTermDocValues.longValue();
Document uuid = reader.document(i, Collections.singleton(IdFieldMapper.NAME));
BytesRef binaryID = uuid.getBinaryValue(IdFieldMapper.NAME);
String id = Uid.decodeId(Arrays.copyOfRange(binaryID.bytes, binaryID.offset, binaryID.offset + binaryID.length));
final long primaryTerm;
if (primaryTermDocValues.advanceExact(i)) {
primaryTerm = primaryTermDocValues.longValue();
} else {
primaryTerm = 0; // non-root documents of a nested document.
}
if (seqNoDocValues.advanceExact(i) == false) {
throw new AssertionError("seqNoDocValues not found for doc[" + i + "] id[" + id + "]");
}

View File

@ -63,6 +63,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
@ -442,13 +443,14 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public synchronized void close() throws Exception {
if (closed == false) {
closed = true;
for (IndexShard replica : replicas) {
try {
try {
final List<DocIdSeqNoAndTerm> docsOnPrimary = getDocIdAndSeqNos(primary);
for (IndexShard replica : replicas) {
assertThat(replica.getMaxSeenAutoIdTimestamp(), equalTo(primary.getMaxSeenAutoIdTimestamp()));
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(primary.getMaxSeqNoOfUpdatesOrDeletes()));
} catch (AlreadyClosedException ignored) {
assertThat(getDocIdAndSeqNos(replica), equalTo(docsOnPrimary));
}
}
} catch (AlreadyClosedException ignored) { }
closeShards(this);
} else {
throw new AlreadyClosedException("too bad");

View File

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.ccr.action.bulk;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportWriteAction;
@ -25,10 +26,12 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.index.engine.AlreadyProcessedFollowingEngineException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.function.Function;
public class TransportBulkShardOperationsAction
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
@ -66,7 +69,7 @@ public class TransportBulkShardOperationsAction
}
// public for testing purposes only
public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
public static CcrWritePrimaryResult shardOperationOnPrimary(
final ShardId shardId,
final String historyUUID,
final List<Translog.Operation> sourceOperations,
@ -78,7 +81,7 @@ public class TransportBulkShardOperationsAction
"], actual [" + primary.getHistoryUUID() + "], shard is likely restored from snapshot or force allocated");
}
final List<Translog.Operation> targetOperations = sourceOperations.stream().map(operation -> {
final Function<Translog.Operation, Translog.Operation> rewriteWithTerm = operation -> {
final Translog.Operation operationWithPrimaryTerm;
switch (operation.opType()) {
case INDEX:
@ -111,36 +114,65 @@ public class TransportBulkShardOperationsAction
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");
}
return operationWithPrimaryTerm;
}).collect(Collectors.toList());
};
assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]";
primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY);
final List<Translog.Operation> appliedOperations = new ArrayList<>(sourceOperations.size());
Translog.Location location = null;
long waitingForGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
for (Translog.Operation sourceOp : sourceOperations) {
final Translog.Operation targetOp = rewriteWithTerm.apply(sourceOp);
final Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY);
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
assert result.getSeqNo() == targetOp.seqNo();
appliedOperations.add(targetOp);
location = locationToSync(location, result.getTranslogLocation());
} else {
if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) {
// Skipped operations will be delivered to replicas via primary-replica resync or peer-recovery.
// The primary must not acknowledge this request until the global checkpoint is at least the highest
// seqno of all skipped operations (i.e., all skipped operations have been processed on every replica).
waitingForGlobalCheckpoint = SequenceNumbers.max(waitingForGlobalCheckpoint, targetOp.seqNo());
} else {
assert false : "Only already-processed error should happen; op=[" + targetOp + "] error=[" + result.getFailure() + "]";
throw ExceptionsHelper.convertToElastic(result.getFailure());
}
}
}
assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO :
"waiting global checkpoint is not assigned; waiting_gcp=" + waitingForGlobalCheckpoint +
" source_ops=" + sourceOperations.size() + " applied_ops=" + sourceOperations.size();
assert appliedOperations.size() == 0 || location != null;
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
shardId, historyUUID, targetOperations, maxSeqNoOfUpdatesOrDeletes);
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes);
return new CcrWritePrimaryResult(replicaRequest, location, primary, waitingForGlobalCheckpoint, logger);
}
@Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() :
"mus on replica [" + replica + "] < mus of request [" + request.getMaxSeqNoOfUpdatesOrDeletes() + "]";
final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA);
return new WriteReplicaResult<>(request, location, null, replica, logger);
return shardOperationOnReplica(request, replica, logger);
}
// public for testing purposes only
public static Translog.Location applyTranslogOperations(
final List<Translog.Operation> operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException {
public static WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica, final Logger logger) throws IOException {
assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() :
"mus on replica [" + replica + "] < mus of request [" + request.getMaxSeqNoOfUpdatesOrDeletes() + "]";
Translog.Location location = null;
for (final Translog.Operation operation : operations) {
final Engine.Result result = shard.applyTranslogOperation(operation, origin);
for (final Translog.Operation operation : request.getOperations()) {
final Engine.Result result = replica.applyTranslogOperation(operation, Engine.Operation.Origin.REPLICA);
if (result.getResultType() != Engine.Result.Type.SUCCESS) {
assert false : "doc-level failure must not happen on replicas; op[" + operation + "] error[" + result.getFailure() + "]";
throw ExceptionsHelper.convertToElastic(result.getFailure());
}
assert result.getSeqNo() == operation.seqNo();
assert result.getResultType() == Engine.Result.Type.SUCCESS;
location = locationToSync(location, result.getTranslogLocation());
}
assert operations.size() == 0 || location != null;
return location;
assert request.getOperations().size() == 0 || location != null;
return new WriteReplicaResult<>(request, location, null, replica, logger);
}
@Override
@ -151,20 +183,37 @@ public class TransportBulkShardOperationsAction
/**
* Custom write result to include global checkpoint after ops have been replicated.
*/
static class CcrWritePrimaryResult extends WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> {
static final class CcrWritePrimaryResult extends WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> {
final long waitingForGlobalCheckpoint;
CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) {
CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary,
long waitingForGlobalCheckpoint, Logger logger) {
super(request, new BulkShardOperationsResponse(), location, null, primary, logger);
this.waitingForGlobalCheckpoint = waitingForGlobalCheckpoint;
}
@Override
public synchronized void respond(ActionListener<BulkShardOperationsResponse> listener) {
final BulkShardOperationsResponse response = finalResponseIfSuccessful;
final SeqNoStats seqNoStats = primary.seqNoStats();
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
listener.onResponse(response);
final ActionListener<BulkShardOperationsResponse> wrappedListener = ActionListener.wrap(response -> {
final SeqNoStats seqNoStats = primary.seqNoStats();
// return a fresh global checkpoint after the operations have been replicated for the shard follow task
response.setGlobalCheckpoint(seqNoStats.getGlobalCheckpoint());
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
listener.onResponse(response);
}, listener::onFailure);
if (waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO) {
primary.addGlobalCheckpointListener(waitingForGlobalCheckpoint, (gcp, e) -> {
if (e != null) {
listener.onFailure(e);
} else {
assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp;
super.respond(wrappedListener);
}
}, null);
} else {
super.respond(wrappedListener);
}
}
}

View File

@ -0,0 +1,16 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.index.engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;
public final class AlreadyProcessedFollowingEngineException extends VersionConflictEngineException {
AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo) {
super(shardId, "operation [{}] was processed before", null, seqNo);
}
}

View File

@ -58,8 +58,21 @@ public final class FollowingEngine extends InternalEngine {
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
if (hasBeenProcessedBefore(index)) {
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
if (index.origin() == Operation.Origin.PRIMARY) {
/*
* The existing operation in this engine was probably assigned the term of the previous primary shard which is different
* from the term of the current operation. If the current operation arrives on replicas before the previous operation,
* then the Lucene content between the primary and replicas are not identical (primary terms are different). Since the
* existing operations are guaranteed to be replicated to replicas either via peer-recovery or primary-replica resync,
* we can safely skip this operation here and let the caller know the decision via AlreadyProcessedFollowingEngineException.
* The caller then waits for the global checkpoint to advance at least the seq_no of this operation to make sure that
* the existing operation was replicated to all replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary).
*/
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, index.seqNo());
return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm());
} else {
return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version());
}
} else if (maxSeqNoOfUpdatesOrDeletes <= getLocalCheckpoint()) {
assert maxSeqNoOfUpdatesOrDeletes < index.seqNo() : "seq_no[" + index.seqNo() + "] <= msu[" + maxSeqNoOfUpdatesOrDeletes + "]";
numOfOptimizedIndexing.inc();
@ -73,7 +86,19 @@ public final class FollowingEngine extends InternalEngine {
@Override
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
preFlight(delete);
return planDeletionAsNonPrimary(delete);
if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, delete.seqNo());
return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false);
} else {
return planDeletionAsNonPrimary(delete);
}
}
@Override
public NoOpResult noOp(NoOp noOp) {
// TODO: Make sure we process NoOp once.
return super.noOp(noOp);
}
@Override

View File

@ -28,6 +28,8 @@ import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -47,11 +49,13 @@ import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
@ -79,6 +83,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@ -117,6 +122,14 @@ public class ShardChangesIT extends ESIntegTestCase {
return Arrays.asList(LocalStateCcr.class, CommonAnalysisPlugin.class);
}
@Override
protected void beforeIndexDeletion() throws Exception {
super.beforeIndexDeletion();
assertSeqNos();
assertSameDocIdsOnShards();
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
}
@Override
protected boolean ignoreExternalCluster() {
return true;
@ -681,6 +694,56 @@ public class ShardChangesIT extends ESIntegTestCase {
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L));
}
public void testFailOverOnFollower() throws Exception {
int numberOfReplicas = between(1, 2);
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNodes(numberOfReplicas + between(1, 2));
String leaderIndexSettings = getIndexSettings(1, numberOfReplicas,
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("leader-index").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("leader-index");
AtomicBoolean stopped = new AtomicBoolean();
Thread[] threads = new Thread[between(1, 8)];
AtomicInteger docID = new AtomicInteger();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
while (stopped.get() == false) {
try {
if (frequently()) {
String id = Integer.toString(frequently() ? docID.incrementAndGet() : between(0, 10)); // sometimes update
client().prepareIndex("leader-index", "doc", id).setSource("{\"f\":" + id + "}", XContentType.JSON).get();
} else {
String id = Integer.toString(between(0, docID.get()));
client().prepareDelete("leader-index", "doc", id).get();
}
} catch (NodeClosedException ignored) {
}
}
});
threads[i].start();
}
PutFollowAction.Request follow = follow("leader-index", "follower-index");
client().execute(PutFollowAction.INSTANCE, follow).get();
ensureGreen("follower-index");
atLeastDocsIndexed("follower-index", between(20, 60));
final ClusterState clusterState = clusterService().state();
for (ShardRouting shardRouting : clusterState.routingTable().allShards("follower-index")) {
if (shardRouting.primary()) {
DiscoveryNode assignedNode = clusterState.nodes().get(shardRouting.currentNodeId());
internalCluster().restartNode(assignedNode.getName(), new InternalTestCluster.RestartCallback());
break;
}
}
ensureGreen("follower-index");
atLeastDocsIndexed("follower-index", between(80, 150));
stopped.set(true);
for (Thread thread : threads) {
thread.join();
}
assertSameDocCount("leader-index", "follower-index");
unfollowIndex("follower-index");
}
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();

View File

@ -12,19 +12,22 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine.Operation.Origin;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.threadpool.ThreadPool;
@ -37,7 +40,9 @@ import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@ -45,6 +50,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -221,6 +227,57 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
}
}
public void testRetryBulkShardOperations() throws Exception {
try (ReplicationGroup leaderGroup = createGroup(between(0, 1));
ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) {
leaderGroup.startAll();
followerGroup.startAll();
leaderGroup.appendDocs(between(10, 100));
leaderGroup.refresh("test");
for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) {
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId));
assertThat(resp.getFailure(), nullValue());
}
leaderGroup.syncGlobalCheckpoint();
IndexShard leadingPrimary = leaderGroup.getPrimary();
// Simulates some bulk requests are completed on the primary and replicated to some (but all) replicas of the follower
// but the primary of the follower crashed before these requests completed.
for (int numBulks = between(1, 5), i = 0; i < numBulks; i++) {
long fromSeqNo = randomLongBetween(0, leadingPrimary.getGlobalCheckpoint());
long toSeqNo = randomLongBetween(fromSeqNo, leadingPrimary.getGlobalCheckpoint());
int numOps = Math.toIntExact(toSeqNo + 1 - fromSeqNo);
Translog.Operation[] ops = ShardChangesAction.getOperations(leadingPrimary, leadingPrimary.getGlobalCheckpoint(),
fromSeqNo, numOps, leadingPrimary.getHistoryUUID(), new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES));
IndexShard followingPrimary = followerGroup.getPrimary();
TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> primaryResult =
TransportBulkShardOperationsAction.shardOperationOnPrimary(followingPrimary.shardId(),
followingPrimary.getHistoryUUID(), Arrays.asList(ops), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(),
followingPrimary, logger);
for (IndexShard replica : randomSubsetOf(followerGroup.getReplicas())) {
final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
replica.acquireReplicaOperationPermit(followingPrimary.getOperationPrimaryTerm(),
followingPrimary.getGlobalCheckpoint(), followingPrimary.getMaxSeqNoOfUpdatesOrDeletes(),
permitFuture, ThreadPool.Names.SAME, primaryResult);
try (Releasable ignored = permitFuture.get()) {
TransportBulkShardOperationsAction.shardOperationOnReplica(primaryResult.replicaRequest(), replica, logger);
}
}
}
// A follow-task retries these requests while the primary-replica resync is happening on the follower.
followerGroup.promoteReplicaToPrimary(randomFrom(followerGroup.getReplicas()));
ShardFollowNodeTask shardFollowTask = createShardFollowTask(leaderGroup, followerGroup);
SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(),
leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
});
shardFollowTask.markAsCompleted();
}
}
@Override
protected ReplicationGroup createGroup(int replicas, Settings settings) throws IOException {
Settings newSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
@ -366,13 +423,29 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
};
}
private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup leader, ReplicationGroup follower) throws IOException {
int totalOps = leader.getPrimary().estimateNumberOfHistoryOperations("test", 0);
for (IndexShard followingShard : follower) {
assertThat(followingShard.estimateNumberOfHistoryOperations("test", 0), equalTo(totalOps));
private void assertConsistentHistoryBetweenLeaderAndFollower(ReplicationGroup leader, ReplicationGroup follower) throws Exception {
final List<Tuple<String, Long>> docAndSeqNosOnLeader = getDocIdAndSeqNos(leader.getPrimary()).stream()
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
final Set<Tuple<Long, Translog.Operation.Type>> operationsOnLeader = new HashSet<>();
try (Translog.Snapshot snapshot = leader.getPrimary().getHistoryOperations("test", 0)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operationsOnLeader.add(Tuple.tuple(op.seqNo(), op.opType()));
}
}
for (IndexShard followingShard : follower) {
assertThat(followingShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(leader.getPrimary().getMaxSeqNoOfUpdatesOrDeletes()));
List<Tuple<String, Long>> docAndSeqNosOnFollower = getDocIdAndSeqNos(followingShard).stream()
.map(d -> Tuple.tuple(d.getId(), d.getSeqNo())).collect(Collectors.toList());
assertThat(docAndSeqNosOnFollower, equalTo(docAndSeqNosOnLeader));
final Set<Tuple<Long, Translog.Operation.Type>> operationsOnFollower = new HashSet<>();
try (Translog.Snapshot snapshot = followingShard.getHistoryOperations("test", 0)) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operationsOnFollower.add(Tuple.tuple(op.seqNo(), op.opType()));
}
}
assertThat(operationsOnFollower, equalTo(operationsOnLeader));
}
}
@ -384,15 +457,24 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
@Override
protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardOperationsRequest request) throws Exception {
TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(),
final PlainActionFuture<Releasable> permitFuture = new PlainActionFuture<>();
primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request);
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> ccrResult;
try (Releasable ignored = permitFuture.get()) {
ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(),
request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful);
}
return new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) {
@Override
public void respond(ActionListener<BulkShardOperationsResponse> listener) {
ccrResult.respond(listener);
}
};
}
@Override
protected void performOnReplica(BulkShardOperationsRequest request, IndexShard replica) throws Exception {
TransportBulkShardOperationsAction.applyTranslogOperations(request.getOperations(), replica, Origin.REPLICA);
TransportBulkShardOperationsAction.shardOperationOnReplica(request, replica, logger);
}
}

View File

@ -7,8 +7,14 @@
package org.elasticsearch.xpack.ccr.action.bulk;
import org.apache.lucene.index.Term;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
@ -20,8 +26,11 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.hamcrest.Matchers.equalTo;
import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.CcrWritePrimaryResult;
public class BulkShardOperationsTests extends IndexShardTestCase {
@ -78,4 +87,116 @@ public class BulkShardOperationsTests extends IndexShardTestCase {
closeShards(followerPrimary);
}
public void testPrimaryResultWaitForGlobalCheckpoint() throws Exception {
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build();
final IndexShard shard = newStartedShard(false, settings, new FollowingEngineFactory());
int numOps = between(1, 100);
for (int i = 0; i < numOps; i++) {
final String id = Integer.toString(i);
final Translog.Operation op;
if (randomBoolean()) {
op = new Translog.Index("_doc", id, i, primaryTerm, 0, SOURCE, null, -1);
} else if (randomBoolean()) {
shard.advanceMaxSeqNoOfUpdatesOrDeletes(i);
op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), i, primaryTerm, 0);
} else {
op = new Translog.NoOp(i, primaryTerm, "test");
}
shard.applyTranslogOperation(op, Engine.Operation.Origin.REPLICA);
}
BulkShardOperationsRequest request = new BulkShardOperationsRequest();
{
PlainActionFuture<BulkShardOperationsResponse> listener = new PlainActionFuture<>();
CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, -2, logger);
primaryResult.respond(listener);
assertThat("should return intermediately if waiting_global_checkpoint is not specified", listener.isDone(), equalTo(true));
assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo()));
}
{
PlainActionFuture<BulkShardOperationsResponse> listener = new PlainActionFuture<>();
long waitingForGlobalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint() + 1, shard.getLocalCheckpoint());
CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger);
primaryResult.respond(listener);
assertThat(listener.isDone(), equalTo(false));
expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1)));
shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), waitingForGlobalCheckpoint - 1), "test");
expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1)));
shard.updateGlobalCheckpointOnReplica(randomLongBetween(waitingForGlobalCheckpoint, shard.getLocalCheckpoint()), "test");
assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo()));
assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint()));
}
{
PlainActionFuture<BulkShardOperationsResponse> listener = new PlainActionFuture<>();
long waitingForGlobalCheckpoint = randomLongBetween(-1, shard.getGlobalCheckpoint());
CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger);
primaryResult.respond(listener);
assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo()));
assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint()));
}
closeShards(shard);
}
public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception {
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build();
final IndexShard primary = newStartedShard(true, settings, new FollowingEngineFactory());
long seqno = 0;
List<Translog.Operation> firstBulk = new ArrayList<>();
List<Translog.Operation> secondBulk = new ArrayList<>();
for (int numOps = between(1, 100), i = 0; i < numOps; i++) {
final String id = Integer.toString(between(1, 100));
final Translog.Operation op;
if (randomBoolean()) {
op = new Translog.Index("_doc", id, seqno++, primaryTerm, 0, SOURCE, null, -1);
} else {
op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqno++, primaryTerm, 0);
}
if (randomBoolean()) {
firstBulk.add(op);
} else {
secondBulk.add(op);
}
}
Randomness.shuffle(firstBulk);
Randomness.shuffle(secondBulk);
primary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno);
final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(),
primary.getHistoryUUID(), firstBulk, seqno, primary, logger);
assertThat(fullResult.replicaRequest().getOperations(),
equalTo(rewriteWithPrimaryTerm(firstBulk, primary.getOperationPrimaryTerm())));
assertThat(fullResult.waitingForGlobalCheckpoint, equalTo(-2L));
// This bulk includes some operations from the first bulk. These operations should not be included in the result.
final List<Translog.Operation> existingOps = randomSubsetOf(firstBulk);
final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(),
primary.getHistoryUUID(), Stream.concat(existingOps.stream(), secondBulk.stream()).collect(Collectors.toList()),
seqno, primary, logger);
assertThat(partialResult.replicaRequest().getOperations(),
equalTo(rewriteWithPrimaryTerm(secondBulk, primary.getOperationPrimaryTerm())));
assertThat(partialResult.waitingForGlobalCheckpoint,
equalTo(existingOps.stream().mapToLong(Translog.Operation::seqNo).max().orElse(-2L)));
closeShards(primary);
}
private List<Translog.Operation> rewriteWithPrimaryTerm(List<Translog.Operation> sourceOperations, long primaryTerm) {
return sourceOperations.stream().map(op -> {
switch (op.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) op;
return new Translog.Index(index.type(), index.id(), index.seqNo(), primaryTerm,
index.version(), BytesReference.toBytes(index.source()), index.routing(), index.getAutoGeneratedIdTimestamp());
case DELETE:
final Translog.Delete delete = (Translog.Delete) op;
return new Translog.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, delete.version());
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) op;
return new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason());
default:
throw new IllegalStateException("unexpected operation type [" + op.opType() + "]");
}
}).collect(Collectors.toList());
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
@ -58,6 +59,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
public class FollowingEngineTests extends ESTestCase {
@ -298,6 +300,25 @@ public class FollowingEngineTests extends ESTestCase {
return new Engine.Delete(parsedDoc.type(), parsedDoc.id(), EngineTestCase.newUid(parsedDoc), primaryTerm.get());
}
private Engine.Result applyOperation(Engine engine, Engine.Operation op,
long primaryTerm, Engine.Operation.Origin origin) throws IOException {
final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? op.versionType() : null;
final Engine.Result result;
if (op instanceof Engine.Index) {
Engine.Index index = (Engine.Index) op;
result = engine.index(new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), primaryTerm, index.version(),
versionType, origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry()));
} else if (op instanceof Engine.Delete) {
Engine.Delete delete = (Engine.Delete) op;
result = engine.delete(new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm,
delete.version(), versionType, origin, delete.startTime()));
} else {
Engine.NoOp noOp = (Engine.NoOp) op;
result = engine.noOp(new Engine.NoOp(noOp.seqNo(), primaryTerm, origin, noOp.startTime(), noOp.reason()));
}
return result;
}
public void testBasicOptimization() throws Exception {
runFollowTest((leader, follower) -> {
long numDocs = between(1, 100);
@ -531,4 +552,57 @@ public class FollowingEngineTests extends ESTestCase {
}
};
}
public void testProcessOnceOnPrimary() throws Exception {
final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true).build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
int numOps = between(10, 100);
List<Engine.Operation> operations = new ArrayList<>(numOps);
for (int i = 0; i < numOps; i++) {
ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null);
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));
} else {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis()));
}
}
Randomness.shuffle(operations);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine followingEngine = createEngine(store, engineConfig)) {
followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(operations.size() - 1L);
final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE);
for (Engine.Operation op : operations) {
Engine.Result result = applyOperation(followingEngine, op, oldTerm, randomFrom(Engine.Operation.Origin.values()));
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
}
// Primary should reject duplicates
final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE);
for (Engine.Operation op : operations) {
Engine.Result result = applyOperation(followingEngine, op, newTerm, Engine.Operation.Origin.PRIMARY);
assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE));
assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class));
}
for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) {
assertThat(docId.getPrimaryTerm(), equalTo(oldTerm));
}
// Replica should accept duplicates
primaryTerm.set(newTerm);
followingEngine.rollTranslogGeneration();
for (Engine.Operation op : operations) {
Engine.Operation.Origin nonPrimary = randomValueOtherThan(Engine.Operation.Origin.PRIMARY,
() -> randomFrom(Engine.Operation.Origin.values()));
Engine.Result result = applyOperation(followingEngine, op, newTerm, nonPrimary);
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
}
for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) {
assertThat(docId.getPrimaryTerm(), equalTo(oldTerm));
}
}
}
}
}