Use follower primary term when applying operations (#31113)

The primary shard copy on the following has authority of the replication
operations that occur on the following side in cross-cluster
replication. Yet today we are using the primary term directly from the
operations on the leader side. Instead we should be replacing the
primary term on the following side with the primary term of the primary
on the following side. This commit does this by copying the translog
operations with the corrected primary term. This ensures that we use
this primary term while applying the operations on the primary, and when
replicating them across to the replica (where the replica request was
carrying the primary term of the primary shard copy on the follower).
This commit is contained in:
Jason Tedor 2018-06-06 11:03:57 -04:00 committed by GitHub
parent d230548401
commit bf1152fcc6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 200 additions and 39 deletions

View File

@ -653,7 +653,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin, long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
SourceToParse sourceToParse) throws IOException { SourceToParse sourceToParse) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
assert versionType.validateVersionForWrites(version); assert versionType.validateVersionForWrites(version) : "version [" + version + "], version type [" + versionType + "]";
ensureWriteAllowed(origin); ensureWriteAllowed(origin);
Engine.Index operation; Engine.Index operation;
try { try {
@ -741,7 +741,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id, private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm, long version, String type, String id,
VersionType versionType, Engine.Operation.Origin origin) throws IOException { VersionType versionType, Engine.Operation.Origin origin) throws IOException {
assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]"; assert opPrimaryTerm <= this.primaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.primaryTerm + "]";
assert versionType.validateVersionForWrites(version); assert versionType.validateVersionForWrites(version) : "version [" + version + "], version type [" + versionType + "]";
ensureWriteAllowed(origin); ensureWriteAllowed(origin);
// When there is a single type, the unique identifier is only composed of the _id, // When there is a single type, the unique identifier is only composed of the _id,
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue // so there is no way to differenciate foo#1 from bar#1. This is especially an issue

View File

@ -18,13 +18,8 @@
*/ */
package org.elasticsearch.index.shard; package org.elasticsearch.index.shard;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -57,10 +52,8 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
@ -155,37 +148,60 @@ public abstract class IndexShardTestCase extends ESTestCase {
} }
/** /**
* creates a new initializing shard. The shard will have its own unique data path. * Creates a new initializing shard. The shard will have its own unique data path.
* *
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica * @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from
* (ready to recover from another shard) * another shard)
*/ */
protected IndexShard newShard(boolean primary) throws IOException { protected IndexShard newShard(boolean primary) throws IOException {
ShardRouting shardRouting = TestShardRouting.newShardRouting(new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary, return newShard(primary, Settings.EMPTY, new InternalEngineFactory());
ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting);
} }
/** /**
* creates a new initializing shard. The shard will have its own unique data path. * Creates a new initializing shard. The shard will have its own unique data path.
*
* @param primary indicates whether to a primary shard (ready to recover from an empty store) or a replica (ready to recover from
* another shard)
* @param settings the settings to use for this shard
* @param engineFactory the engine factory to use for this shard
*/
protected IndexShard newShard(boolean primary, Settings settings, EngineFactory engineFactory) throws IOException {
final RecoverySource recoverySource =
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE;
final ShardRouting shardRouting =
TestShardRouting.newShardRouting(
new ShardId("index", "_na_", 0), randomAlphaOfLength(10), primary, ShardRoutingState.INITIALIZING, recoverySource);
return newShard(shardRouting, settings, engineFactory);
}
protected IndexShard newShard(ShardRouting shardRouting, final IndexingOperationListener... listeners) throws IOException {
return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners);
}
/**
* Creates a new initializing shard. The shard will have its own unique data path.
* *
* @param shardRouting the {@link ShardRouting} to use for this shard * @param shardRouting the {@link ShardRouting} to use for this shard
* @param settings the settings to use for this shard
* @param engineFactory the engine factory to use for this shard
* @param listeners an optional set of listeners to add to the shard * @param listeners an optional set of listeners to add to the shard
*/ */
protected IndexShard newShard( protected IndexShard newShard(
final ShardRouting shardRouting, final ShardRouting shardRouting,
final Settings settings,
final EngineFactory engineFactory,
final IndexingOperationListener... listeners) throws IOException { final IndexingOperationListener... listeners) throws IOException {
assert shardRouting.initializing() : shardRouting; assert shardRouting.initializing() : shardRouting;
Settings settings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(settings)
.build(); .build();
IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName()) IndexMetaData.Builder metaData = IndexMetaData.builder(shardRouting.getIndexName())
.settings(settings) .settings(indexSettings)
.primaryTerm(0, primaryTerm) .primaryTerm(0, primaryTerm)
.putMapping("_doc", "{ \"properties\": {} }"); .putMapping("_doc", "{ \"properties\": {} }");
return newShard(shardRouting, metaData.build(), listeners); return newShard(shardRouting, metaData.build(), engineFactory, listeners);
} }
/** /**
@ -200,7 +216,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(5), primary, ShardRouting shardRouting = TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(5), primary,
ShardRoutingState.INITIALIZING, ShardRoutingState.INITIALIZING,
primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE); primary ? RecoverySource.StoreRecoverySource.EMPTY_STORE_INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE);
return newShard(shardRouting, listeners); return newShard(shardRouting, Settings.EMPTY, new InternalEngineFactory(), listeners);
} }
/** /**
@ -240,9 +256,10 @@ public abstract class IndexShardTestCase extends ESTestCase {
* @param indexMetaData indexMetaData for the shard, including any mapping * @param indexMetaData indexMetaData for the shard, including any mapping
* @param listeners an optional set of listeners to add to the shard * @param listeners an optional set of listeners to add to the shard
*/ */
protected IndexShard newShard(ShardRouting routing, IndexMetaData indexMetaData, IndexingOperationListener... listeners) protected IndexShard newShard(
ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory, IndexingOperationListener... listeners)
throws IOException { throws IOException {
return newShard(routing, indexMetaData, null, new InternalEngineFactory(), () -> {}, listeners); return newShard(routing, indexMetaData, null, engineFactory, () -> {}, listeners);
} }
/** /**
@ -342,19 +359,31 @@ public abstract class IndexShardTestCase extends ESTestCase {
} }
/** /**
* creates a new empyu shard and starts it. The shard will be either a replica or a primary. * Creates a new empty shard and starts it. The shard will randomly be a replica or a primary.
*/ */
protected IndexShard newStartedShard() throws IOException { protected IndexShard newStartedShard() throws IOException {
return newStartedShard(randomBoolean()); return newStartedShard(randomBoolean());
} }
/** /**
* creates a new empty shard and starts it. * Creates a new empty shard and starts it.
* *
* @param primary controls whether the shard will be a primary or a replica. * @param primary controls whether the shard will be a primary or a replica.
*/ */
protected IndexShard newStartedShard(boolean primary) throws IOException { protected IndexShard newStartedShard(final boolean primary) throws IOException {
IndexShard shard = newShard(primary); return newStartedShard(primary, Settings.EMPTY, new InternalEngineFactory());
}
/**
* Creates a new empty shard with the specified settings and engine factory and starts it.
*
* @param primary controls whether the shard will be a primary or a replica.
* @param settings the settings to use for this shard
* @param engineFactory the engine factory to use for this shard
*/
protected IndexShard newStartedShard(
final boolean primary, final Settings settings, final EngineFactory engineFactory) throws IOException {
IndexShard shard = newShard(primary, settings, engineFactory);
if (primary) { if (primary) {
recoverShardFromStore(shard); recoverShardFromStore(shard);
} else { } else {

View File

@ -3,23 +3,28 @@
* or more contributor license agreements. Licensed under the Elastic License; * or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License. * you may not use this file except in compliance with the Elastic License.
*/ */
package org.elasticsearch.xpack.ccr.action.bulk; package org.elasticsearch.xpack.ccr.action.bulk;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
public class TransportBulkShardOperationsAction public class TransportBulkShardOperationsAction
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> { extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
@ -52,27 +57,72 @@ public class TransportBulkShardOperationsAction
@Override @Override
protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary( protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
final Translog.Location location = applyTranslogOperations(request, primary, Engine.Operation.Origin.PRIMARY); return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger);
return new WritePrimaryResult<>(request, new BulkShardOperationsResponse(), location, null, primary, logger); }
static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
final ShardId shardId,
final Translog.Operation[] sourceOperations,
final IndexShard primary,
final Logger logger) throws IOException {
final Translog.Operation[] targetOperations = Arrays.stream(sourceOperations).map(operation -> {
final Translog.Operation operationWithPrimaryTerm;
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
operationWithPrimaryTerm = new Translog.Index(
index.type(),
index.id(),
index.seqNo(),
primary.getPrimaryTerm(),
index.version(),
index.versionType(),
BytesReference.toBytes(index.source()),
index.routing(),
index.getAutoGeneratedIdTimestamp());
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
operationWithPrimaryTerm = new Translog.Delete(
delete.type(),
delete.id(),
delete.uid(),
delete.seqNo(),
primary.getPrimaryTerm(),
delete.version(),
delete.versionType());
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getPrimaryTerm(), noOp.reason());
break;
default:
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");
}
return operationWithPrimaryTerm;
}).toArray(Translog.Operation[]::new);
final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY);
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations);
return new WritePrimaryResult<>(replicaRequest, new BulkShardOperationsResponse(), location, null, primary, logger);
} }
@Override @Override
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica( protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception { final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
final Translog.Location location = applyTranslogOperations(request, replica, Engine.Operation.Origin.REPLICA); final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA);
return new WriteReplicaResult<>(request, location, null, replica, logger); return new WriteReplicaResult<>(request, location, null, replica, logger);
} }
private Translog.Location applyTranslogOperations( private static Translog.Location applyTranslogOperations(
final BulkShardOperationsRequest request, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException { final Translog.Operation[] operations, final IndexShard shard, final Engine.Operation.Origin origin) throws IOException {
Translog.Location location = null; Translog.Location location = null;
for (final Translog.Operation operation : request.getOperations()) { for (final Translog.Operation operation : operations) {
final Engine.Result result = shard.applyTranslogOperation(operation, origin); final Engine.Result result = shard.applyTranslogOperation(operation, origin);
assert result.getSeqNo() == operation.seqNo(); assert result.getSeqNo() == operation.seqNo();
assert result.getResultType() == Engine.Result.Type.SUCCESS; assert result.getResultType() == Engine.Result.Type.SUCCESS;
location = locationToSync(location, result.getTranslogLocation()); location = locationToSync(location, result.getTranslogLocation());
} }
assert request.getOperations().length == 0 || location != null; assert operations.length == 0 || location != null;
return location; return location;
} }

View File

@ -0,0 +1,82 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action.bulk;
import org.apache.lucene.index.Term;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.hamcrest.Matchers.equalTo;
public class BulkShardOperationsTests extends IndexShardTestCase {
private static final byte[] SOURCE = "{}".getBytes(StandardCharsets.UTF_8);
// test that we use the primary term on the follower when applying operations from the leader
public void testPrimaryTermFromFollower() throws IOException {
final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build();
final IndexShard followerPrimary = newStartedShard(true, settings, new FollowingEngineFactory());
// we use this primary on the operations yet we expect the applied operations to have the primary term of the follower
final long primaryTerm = randomLongBetween(1, Integer.MAX_VALUE);
final Translog.Operation[] operations = new Translog.Operation[randomIntBetween(0, 127)];
for (int i = 0; i < operations.length; i++) {
final String id = Integer.toString(i);
final long seqNo = i;
final Translog.Operation.Type type =
randomValueOtherThan(Translog.Operation.Type.CREATE, () -> randomFrom(Translog.Operation.Type.values()));
switch (type) {
case INDEX:
operations[i] = new Translog.Index("_doc", id, seqNo, primaryTerm, 0, VersionType.INTERNAL, SOURCE, null, -1);
break;
case DELETE:
operations[i] =
new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqNo, primaryTerm, 0, VersionType.INTERNAL);
break;
case NO_OP:
operations[i] = new Translog.NoOp(seqNo, primaryTerm, "test");
break;
default:
throw new IllegalStateException("unexpected operation type [" + type + "]");
}
}
final TransportWriteAction.WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> result =
TransportBulkShardOperationsAction.shardOperationOnPrimary(followerPrimary.shardId(), operations, followerPrimary, logger);
try (Translog.Snapshot snapshot = followerPrimary.newTranslogSnapshotFromMinSeqNo(0)) {
assertThat(snapshot.totalOperations(), equalTo(operations.length));
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm()));
}
}
for (final Translog.Operation operation : result.replicaRequest().getOperations()) {
assertThat(operation.primaryTerm(), equalTo(followerPrimary.getPrimaryTerm()));
}
closeShards(followerPrimary);
}
}