Refactor internal engine

This commit is a minor refactoring of internal engine to move hooks for
generating sequence numbers into the engine itself. As such, we refactor
tests that relied on this hook to use the new hook, and remove the hook
from the sequence number service itself.

Relates #27082
This commit is contained in:
Jason Tedor 2017-10-30 13:10:20 -04:00 committed by GitHub
parent 792641a6e3
commit a566942219
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 416 additions and 254 deletions

View File

@ -572,7 +572,11 @@ public abstract class Engine implements Closeable {
return new CommitStats(getLastCommittedSegmentInfos());
}
/** get the sequence number service */
/**
* The sequence number service for this engine.
*
* @return the sequence number service
*/
public abstract SequenceNumbersService seqNoService();
/**

View File

@ -145,6 +145,12 @@ public class InternalEngine extends Engine {
private final String historyUUID;
public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, InternalEngine::sequenceNumberService);
}
InternalEngine(
final EngineConfig engineConfig,
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoServiceSupplier) {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
@ -152,11 +158,11 @@ public class InternalEngine extends Engine {
}
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
IndexWriter writer = null;
Translog translog = null;
@ -184,20 +190,20 @@ public class InternalEngine extends Engine {
case CREATE_INDEX_AND_TRANSLOG:
writer = createWriter(true);
seqNoStats = new SeqNoStats(
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
break;
default:
throw new IllegalArgumentException(openMode.toString());
}
logger.trace("recovered [{}]", seqNoStats);
seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats);
seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
Objects.requireNonNull(historyUUID, "history uuid should not be null");
indexWriter = writer;
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService.getGlobalCheckpoint());
assert translog.getGeneration() != null;
this.translog = translog;
updateWriterOnOpen();
@ -240,12 +246,12 @@ public class InternalEngine extends Engine {
public void restoreLocalCheckpointFromTranslog() throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
if (operation.seqNo() > localCheckpoint) {
seqNoService().markSeqNoAsCompleted(operation.seqNo());
seqNoService.markSeqNoAsCompleted(operation.seqNo());
}
}
}
@ -256,17 +262,17 @@ public class InternalEngine extends Engine {
public int fillSeqNoGaps(long primaryTerm) throws IOException {
try (ReleasableLock ignored = writeLock.acquire()) {
ensureOpen();
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final long maxSeqNo = seqNoService().getMaxSeqNo();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final long maxSeqNo = seqNoService.getMaxSeqNo();
int numNoOpsAdded = 0;
for (
long seqNo = localCheckpoint + 1;
seqNo <= maxSeqNo;
seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
seqNo = seqNoService.getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
numNoOpsAdded++;
assert seqNo <= seqNoService().getLocalCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]";
assert seqNo <= seqNoService.getLocalCheckpoint()
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService.getLocalCheckpoint() + "]";
}
return numNoOpsAdded;
@ -284,15 +290,13 @@ public class InternalEngine extends Engine {
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
}
private static SequenceNumbersService sequenceNumberService(
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
static SequenceNumbersService sequenceNumberService(
final EngineConfig engineConfig,
final SeqNoStats seqNoStats) {
return new SequenceNumbersService(
shardId,
allocationId,
indexSettings,
engineConfig.getShardId(),
engineConfig.getAllocationId(),
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint());
@ -621,8 +625,7 @@ public class InternalEngine extends Engine {
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" +
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo;
} else if (origin == Operation.Origin.PRIMARY) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "primary ops should never have an assigned seq no.; seqNo: " + seqNo;
assert assertOriginPrimarySequenceNumber(seqNo);
} else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) {
// sequence number should be set when operation origin is not primary
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
@ -630,6 +633,13 @@ public class InternalEngine extends Engine {
return true;
}
protected boolean assertOriginPrimarySequenceNumber(final long seqNo) {
// sequence number should not be set when operation origin is primary
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
return true;
}
private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) {
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) ||
origin == Operation.Origin.PRIMARY) {
@ -639,6 +649,20 @@ public class InternalEngine extends Engine {
return true;
}
private long generateSeqNoForOperation(final Operation operation) {
assert operation.origin() == Operation.Origin.PRIMARY;
return doGenerateSeqNoForOperation(operation);
}
/**
* Generate the sequence number for the specified operation.
*
* @param operation the operation
* @return the sequence number
*/
protected long doGenerateSeqNoForOperation(final Operation operation) {
return seqNoService.generateSeqNo();
}
@Override
public IndexResult index(Index index) throws IOException {
@ -708,7 +732,7 @@ public class InternalEngine extends Engine {
indexResult.setTranslogLocation(location);
}
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(indexResult.getSeqNo());
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
@ -728,14 +752,12 @@ public class InternalEngine extends Engine {
final IndexingStrategy plan;
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
// no need to deal with out of order delivery - we never saw this one
assert index.version() == 1L :
"can optimize on replicas but incoming version is [" + index.version() + "]";
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
} else {
// drop out of order operations
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
+ index.versionType() + "]";
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
// unlike the primary, replicas don't really care to about creation status of documents
// this allows to ignore the case where a document was found in the live version maps in
// a delete state and return false for the created flag in favor of code simplicity
@ -770,15 +792,14 @@ public class InternalEngine extends Engine {
}
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
assert index.origin() == Operation.Origin.PRIMARY :
"planing as primary but origin isn't. got " + index.origin();
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
final IndexingStrategy plan;
// resolve an external operation into an internal one which is safe to replay
if (canOptimizeAddDocument(index)) {
if (mayHaveBeenIndexedBefore(index)) {
plan = IndexingStrategy.overrideExistingAsIfNotThere(seqNoService().generateSeqNo(), 1L);
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
} else {
plan = IndexingStrategy.optimizedAppendOnly(seqNoService().generateSeqNo());
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
}
} else {
// resolves incoming version
@ -799,7 +820,7 @@ public class InternalEngine extends Engine {
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
seqNoService().generateSeqNo(),
generateSeqNoForOperation(index),
index.versionType().updateVersion(currentVersion, index.version())
);
}
@ -1008,7 +1029,7 @@ public class InternalEngine extends Engine {
deleteResult.setTranslogLocation(location);
}
if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo());
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
}
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
@ -1025,8 +1046,7 @@ public class InternalEngine extends Engine {
}
private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got "
+ delete.origin();
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// drop out of order operations
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
@ -1064,8 +1084,7 @@ public class InternalEngine extends Engine {
}
private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got "
+ delete.origin();
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
// resolve operation from external to internal
final VersionValue versionValue = resolveDocVersion(delete);
assert incrementVersionLookup();
@ -1083,9 +1102,10 @@ public class InternalEngine extends Engine {
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
plan = DeletionStrategy.processNormally(currentlyDeleted,
seqNoService().generateSeqNo(),
delete.versionType().updateVersion(currentVersion, delete.version()));
plan = DeletionStrategy.processNormally(
currentlyDeleted,
generateSeqNoForOperation(delete),
delete.versionType().updateVersion(currentVersion, delete.version()));
}
return plan;
}
@ -1186,7 +1206,7 @@ public class InternalEngine extends Engine {
return noOpResult;
} finally {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(seqNo);
seqNoService.markSeqNoAsCompleted(seqNo);
}
}
}
@ -1921,7 +1941,7 @@ public class InternalEngine extends Engine {
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService().getLocalCheckpoint();
final long localCheckpoint = seqNoService.getLocalCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
@ -1944,7 +1964,7 @@ public class InternalEngine extends Engine {
if (syncId != null) {
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
}
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
logger.trace("committing writer with commit data [{}]", commitData);
@ -2008,8 +2028,7 @@ public class InternalEngine extends Engine {
return mergeScheduler.stats();
}
@Override
public SequenceNumbersService seqNoService() {
public final SequenceNumbersService seqNoService() {
return seqNoService;
}

View File

@ -66,7 +66,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
*
* @return the next assigned sequence number
*/
public long generateSeqNo() {
public final long generateSeqNo() {
return localCheckpointTracker.generateSeqNo();
}

View File

@ -27,7 +27,6 @@ import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
import org.apache.lucene.document.Field;
@ -95,46 +94,36 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine.Searcher;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
@ -171,9 +160,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToLongBiFunction;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@ -184,7 +173,6 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.mapper.SourceToParse.source;
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;
@ -356,11 +344,19 @@ public class InternalEngineTests extends ESTestCase {
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
}
protected InternalEngine createEngine(Store store, Path translogPath,
Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
protected InternalEngine createEngine(Store store,
Path translogPath,
BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier);
}
protected InternalEngine createEngine(Store store,
Path translogPath,
BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation) throws IOException {
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null);
}
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
return createEngine(indexSettings, store, translogPath, mergePolicy, null);
@ -377,8 +373,19 @@ public class InternalEngineTests extends ESTestCase {
Path translogPath,
MergePolicy mergePolicy,
@Nullable IndexWriterFactory indexWriterFactory,
@Nullable Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null);
@Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null);
}
protected InternalEngine createEngine(
IndexSettings indexSettings,
Store store,
Path translogPath,
MergePolicy mergePolicy,
@Nullable IndexWriterFactory indexWriterFactory,
@Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation) throws IOException {
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, null);
}
protected InternalEngine createEngine(
@ -387,10 +394,11 @@ public class InternalEngineTests extends ESTestCase {
Path translogPath,
MergePolicy mergePolicy,
@Nullable IndexWriterFactory indexWriterFactory,
@Nullable Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier,
@Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
@Nullable Sort indexSort) throws IOException {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort);
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config);
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog();
}
@ -404,21 +412,39 @@ public class InternalEngineTests extends ESTestCase {
}
public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory,
@Nullable final Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier,
@Nullable final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
@Nullable final ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
final EngineConfig config) {
return new InternalEngine(config) {
@Override
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return (indexWriterFactory != null) ?
indexWriterFactory.createWriter(directory, iwc) :
super.createWriter(directory, iwc);
}
if (sequenceNumbersServiceSupplier == null) {
return new InternalEngine(config) {
@Override
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return (indexWriterFactory != null) ?
indexWriterFactory.createWriter(directory, iwc) :
super.createWriter(directory, iwc);
}
@Override
public SequenceNumbersService seqNoService() {
return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.apply(config) : super.seqNoService();
protected long doGenerateSeqNoForOperation(final Operation operation) {
return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation);
}
};
} else {
return new InternalEngine(config, sequenceNumbersServiceSupplier) {
@Override
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
return (indexWriterFactory != null) ?
indexWriterFactory.createWriter(directory, iwc) :
super.createWriter(directory, iwc);
}
@Override
protected long doGenerateSeqNoForOperation(final Operation operation) {
return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation);
}
};
}
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
@ -672,8 +698,8 @@ public class InternalEngineTests extends ESTestCase {
public void testSegmentsWithIndexSort() throws Exception {
Sort indexSort = new Sort(new SortedSetSortField("_type", false));
try (Store store = createStore();
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
null, null, indexSort)) {
Engine engine =
createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, null, indexSort)) {
List<Segment> segments = engine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));
@ -729,13 +755,28 @@ public class InternalEngineTests extends ESTestCase {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
try (
Store store = createStore();
InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
maxSeqNo.get(),
localCheckpoint.get(),
globalCheckpoint.get())
InternalEngine engine = createEngine(store, createTempDir(), (config, seqNoStats) -> new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint()) {
@Override
public long getMaxSeqNo() {
return maxSeqNo.get();
}
@Override
public long getLocalCheckpoint() {
return localCheckpoint.get();
}
@Override
public long getGlobalCheckpoint() {
return globalCheckpoint.get();
}
}
)) {
CommitStats stats1 = engine.commitStats();
assertThat(stats1.getGeneration(), greaterThan(0L));
@ -902,20 +943,11 @@ public class InternalEngineTests extends ESTestCase {
Store store = createStore();
final AtomicInteger counter = new AtomicInteger();
try {
initialEngine = createEngine(store, createTempDir(), (config) ->
new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO) {
@Override
public long generateSeqNo() {
return seqNos.get(counter.getAndIncrement());
}
}
);
initialEngine = createEngine(
store,
createTempDir(),
InternalEngine::sequenceNumberService,
(engine, operation) -> seqNos.get(counter.getAndIncrement()));
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
@ -2711,7 +2743,7 @@ public class InternalEngineTests extends ESTestCase {
assertVisibleCount(engine, numDocs, false);
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
assertEquals(numDocs, parser.appliedOperations.get());
assertEquals(numDocs, parser.appliedOperations());
if (parser.mappingUpdate != null) {
assertEquals(1, parser.getRecoveredTypes().size());
assertTrue(parser.getRecoveredTypes().containsKey("test"));
@ -2723,7 +2755,7 @@ public class InternalEngineTests extends ESTestCase {
engine = createEngine(store, primaryTranslogDir);
assertVisibleCount(engine, numDocs, false);
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
assertEquals(0, parser.appliedOperations.get());
assertEquals(0, parser.appliedOperations());
final boolean flush = randomBoolean();
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
@ -2753,7 +2785,7 @@ public class InternalEngineTests extends ESTestCase {
assertThat(topDocs.totalHits, equalTo(numDocs + 1L));
}
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
assertEquals(flush ? 1 : 2, parser.appliedOperations.get());
assertEquals(flush ? 1 : 2, parser.appliedOperations());
engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc)));
if (randomBoolean()) {
engine.refresh("test");
@ -2767,97 +2799,6 @@ public class InternalEngineTests extends ESTestCase {
}
}
public static class TranslogHandler implements EngineConfig.TranslogRecoveryRunner {
private final MapperService mapperService;
public Mapping mappingUpdate = null;
private final Map<String, Mapping> recoveredTypes = new HashMap<>();
private final AtomicLong appliedOperations = new AtomicLong();
public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) {
NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer());
IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap());
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
() -> null);
}
private DocumentMapperForType docMapper(String type) {
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type);
DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService);
return new DocumentMapperForType(b.build(mapperService), mappingUpdate);
}
private void applyOperation(Engine engine, Engine.Operation operation) throws IOException {
switch (operation.operationType()) {
case INDEX:
Engine.Index engineIndex = (Engine.Index) operation;
Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate();
if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) {
recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false));
}
engine.index(engineIndex);
break;
case DELETE:
engine.delete((Engine.Delete) operation);
break;
case NO_OP:
engine.noOp((Engine.NoOp) operation);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
}
/**
* Returns the recovered types modifying the mapping during the recovery
*/
public Map<String, Mapping> getRecoveredTypes() {
return recoveredTypes;
}
@Override
public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
int opsRecovered = 0;
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY));
opsRecovered++;
appliedOperations.incrementAndGet();
}
return opsRecovered;
}
private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
final String indexName = mapperService.index().getName();
final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
mapperService.getIndexSettings().getIndexVersionCreated(),
source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
.routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin,
index.getAutoGeneratedIdTimestamp(), true);
return engineIndex;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
origin, System.nanoTime());
return engineDelete;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
final Engine.NoOp engineNoOp =
new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason());
return engineNoOp;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
}
}
public void testRecoverFromForeignTranslog() throws IOException {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
@ -3786,47 +3727,38 @@ public class InternalEngineTests extends ESTestCase {
}
/**
* A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier
* and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the
* value of {@code expectedLocalCheckpoint} is set accordingly.
* A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the
* referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of
* {@code expectedLocalCheckpoint} is set accordingly.
*
* @param latchReference to latch the thread for the purpose of stalling
* @param barrier to signal the thread has generated a new sequence number
* @param stall whether or not the thread should stall
* @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence
* number
* @return a sequence number service
* @return a sequence number generator
*/
private SequenceNumbersService getStallingSeqNoService(
private ToLongBiFunction<Engine, Engine.Operation> getStallingSeqNoGenerator(
final AtomicReference<CountDownLatch> latchReference,
final CyclicBarrier barrier,
final AtomicBoolean stall,
final AtomicLong expectedLocalCheckpoint) {
return new SequenceNumbersService(
shardId,
allocationId.getId(),
defaultSettings,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO) {
@Override
public long generateSeqNo() {
final long seqNo = super.generateSeqNo();
final CountDownLatch latch = latchReference.get();
if (stall.get()) {
try {
barrier.await();
latch.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
} else {
if (expectedLocalCheckpoint.get() + 1 == seqNo) {
expectedLocalCheckpoint.set(seqNo);
}
return (engine, operation) -> {
final long seqNo = engine.seqNoService().generateSeqNo();
final CountDownLatch latch = latchReference.get();
if (stall.get()) {
try {
barrier.await();
latch.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
} else {
if (expectedLocalCheckpoint.get() + 1 == seqNo) {
expectedLocalCheckpoint.set(seqNo);
}
return seqNo;
}
return seqNo;
};
}
@ -3840,8 +3772,8 @@ public class InternalEngineTests extends ESTestCase {
final AtomicBoolean stall = new AtomicBoolean();
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final List<Thread> threads = new ArrayList<>();
final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService);
initialEngine =
createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint));
final InternalEngine finalInitialEngine = initialEngine;
for (int i = 0; i < docs; i++) {
final String id = Integer.toString(i);
@ -4015,17 +3947,17 @@ public class InternalEngineTests extends ESTestCase {
final int localCheckpoint = randomIntBetween(0, maxSeqNo);
final int globalCheckpoint = randomIntBetween(0, localCheckpoint);
try {
final SequenceNumbersService seqNoService =
new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) {
@Override
public long generateSeqNo() {
throw new UnsupportedOperationException();
}
};
noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> supplier = (engineConfig, ignored) -> new SequenceNumbersService(
engineConfig.getShardId(),
engineConfig.getAllocationId(),
engineConfig.getIndexSettings(),
maxSeqNo,
localCheckpoint,
globalCheckpoint);
noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier) {
@Override
public SequenceNumbersService seqNoService() {
return seqNoService;
protected long doGenerateSeqNoForOperation(Operation operation) {
throw new UnsupportedOperationException();
}
};
noOpEngine.recoverFromTranslog();
@ -4070,8 +4002,8 @@ public class InternalEngineTests extends ESTestCase {
final AtomicBoolean stall = new AtomicBoolean();
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final Map<Thread, CountDownLatch> threads = new LinkedHashMap<>();
final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService);
actualEngine =
createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint));
final InternalEngine finalActualEngine = actualEngine;
final Translog translog = finalActualEngine.getTranslog();
final long generation = finalActualEngine.getTranslog().currentFileGeneration();
@ -4160,26 +4092,20 @@ public class InternalEngineTests extends ESTestCase {
InternalEngine actualEngine = null;
try {
final Set<Long> completedSeqNos = new HashSet<>();
final SequenceNumbersService seqNoService =
new SequenceNumbersService(
shardId,
allocationId.getId(),
defaultSettings,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO) {
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> supplier = (engineConfig, seqNoStats) -> new SequenceNumbersService(
engineConfig.getShardId(),
engineConfig.getAllocationId(),
engineConfig.getIndexSettings(),
seqNoStats.getMaxSeqNo(),
seqNoStats.getLocalCheckpoint(),
seqNoStats.getGlobalCheckpoint()) {
@Override
public void markSeqNoAsCompleted(long seqNo) {
super.markSeqNoAsCompleted(seqNo);
completedSeqNos.add(seqNo);
}
};
actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
@Override
public SequenceNumbersService seqNoService() {
return seqNoService;
}
};
actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier);
final int operations = randomIntBetween(0, 1024);
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
for (int i = 0; i < operations; i++) {
@ -4347,4 +4273,71 @@ public class InternalEngineTests extends ESTestCase {
assertSameReader(getSearcher, searchSearcher);
}
}
public void testSeqNoGenerator() throws IOException {
engine.close();
final long seqNo = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE);
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoService = (config, seqNoStats) -> new SequenceNumbersService(
config.getShardId(),
config.getAllocationId(),
config.getIndexSettings(),
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.NO_OPS_PERFORMED,
SequenceNumbers.UNASSIGNED_SEQ_NO);
final AtomicLong seqNoGenerator = new AtomicLong(seqNo);
try (Engine e = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, seqNoService, (engine, operation) -> seqNoGenerator.getAndIncrement())) {
final String id = "id";
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
final String type = "type";
final Field versionField = new NumericDocValuesField("_version", 0);
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
final ParseContext.Document document = new ParseContext.Document();
document.add(uidField);
document.add(versionField);
document.add(seqID.seqNo);
document.add(seqID.seqNoDocValue);
document.add(seqID.primaryTerm);
final BytesReference source = new BytesArray(new byte[]{1});
final ParsedDocument parsedDocument = new ParsedDocument(
versionField,
seqID,
id,
type,
"routing",
Collections.singletonList(document),
source,
XContentType.JSON,
null);
final Engine.Index index = new Engine.Index(
new Term("_id", parsedDocument.id()),
parsedDocument,
SequenceNumbers.UNASSIGNED_SEQ_NO,
(long) randomIntBetween(1, 8),
Versions.MATCH_ANY,
VersionType.INTERNAL,
Engine.Operation.Origin.PRIMARY,
System.currentTimeMillis(),
System.currentTimeMillis(),
randomBoolean());
final Engine.IndexResult indexResult = e.index(index);
assertThat(indexResult.getSeqNo(), equalTo(seqNo));
assertThat(seqNoGenerator.get(), equalTo(seqNo + 1));
final Engine.Delete delete = new Engine.Delete(
type,
id,
new Term("_id", parsedDocument.id()),
SequenceNumbers.UNASSIGNED_SEQ_NO,
(long) randomIntBetween(1, 8),
Versions.MATCH_ANY,
VersionType.INTERNAL,
Engine.Operation.Origin.PRIMARY,
System.currentTimeMillis());
final Engine.DeleteResult deleteResult = e.delete(delete);
assertThat(deleteResult.getSeqNo(), equalTo(seqNo + 1));
assertThat(seqNoGenerator.get(), equalTo(seqNo + 2));
}
}
}

View File

@ -316,7 +316,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
assert documentFailureMessage != null;
throw new IOException(documentFailureMessage);
}
}, null, config);
}, null, null, config);
}
}

View File

@ -637,6 +637,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
}
},
null,
null,
config);
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.mapper.MapperRegistry;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static org.elasticsearch.index.mapper.SourceToParse.source;
public class TranslogHandler implements EngineConfig.TranslogRecoveryRunner {
private final MapperService mapperService;
public Mapping mappingUpdate = null;
private final Map<String, Mapping> recoveredTypes = new HashMap<>();
private final AtomicLong appliedOperations = new AtomicLong();
long appliedOperations() {
return appliedOperations.get();
}
public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) {
NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer());
IndexAnalyzers indexAnalyzers =
new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, emptyMap(), emptyMap());
SimilarityService similarityService = new SimilarityService(indexSettings, null, emptyMap());
MapperRegistry mapperRegistry = new IndicesModule(emptyList()).getMapperRegistry();
mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
() -> null);
}
private DocumentMapperForType docMapper(String type) {
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type);
DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService);
return new DocumentMapperForType(b.build(mapperService), mappingUpdate);
}
private void applyOperation(Engine engine, Engine.Operation operation) throws IOException {
switch (operation.operationType()) {
case INDEX:
Engine.Index engineIndex = (Engine.Index) operation;
Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate();
if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) {
recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false));
}
engine.index(engineIndex);
break;
case DELETE:
engine.delete((Engine.Delete) operation);
break;
case NO_OP:
engine.noOp((Engine.NoOp) operation);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
}
/**
* Returns the recovered types modifying the mapping during the recovery
*/
public Map<String, Mapping> getRecoveredTypes() {
return recoveredTypes;
}
@Override
public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
int opsRecovered = 0;
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY));
opsRecovered++;
appliedOperations.incrementAndGet();
}
return opsRecovered;
}
private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
final String indexName = mapperService.index().getName();
final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
mapperService.getIndexSettings().getIndexVersionCreated(),
source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
.routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin,
index.getAutoGeneratedIdTimestamp(), true);
return engineIndex;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
origin, System.nanoTime());
return engineDelete;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
final Engine.NoOp engineNoOp =
new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason());
return engineNoOp;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
}
}