Merge branch 'master' into ccr
* master: Refactor internal engine [Docs] #26541: add warning regarding the limit on the number of fields that can be queried at once in the multi_match query. [Docs] Fix note in bucket_selector [Docs] Fix indentation of examples (#27168) [Docs] Clarify `span_not` query behavior for non-overlapping matches (#27150) [Docs] Remove first person "I" from getting started (#27155)
This commit is contained in:
commit
3cc1a88f7f
|
@ -572,7 +572,11 @@ public abstract class Engine implements Closeable {
|
|||
return new CommitStats(getLastCommittedSegmentInfos());
|
||||
}
|
||||
|
||||
/** get the sequence number service */
|
||||
/**
|
||||
* The sequence number service for this engine.
|
||||
*
|
||||
* @return the sequence number service
|
||||
*/
|
||||
public abstract SequenceNumbersService seqNoService();
|
||||
|
||||
/**
|
||||
|
|
|
@ -145,6 +145,12 @@ public class InternalEngine extends Engine {
|
|||
private final String historyUUID;
|
||||
|
||||
public InternalEngine(EngineConfig engineConfig) {
|
||||
this(engineConfig, InternalEngine::sequenceNumberService);
|
||||
}
|
||||
|
||||
InternalEngine(
|
||||
final EngineConfig engineConfig,
|
||||
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoServiceSupplier) {
|
||||
super(engineConfig);
|
||||
openMode = engineConfig.getOpenMode();
|
||||
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
|
||||
|
@ -152,11 +158,11 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
|
||||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
|
||||
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
|
||||
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
|
||||
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
|
||||
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
|
||||
);
|
||||
this.deletionPolicy = new CombinedDeletionPolicy(
|
||||
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
|
||||
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
|
||||
store.incRef();
|
||||
IndexWriter writer = null;
|
||||
Translog translog = null;
|
||||
|
@ -184,20 +190,20 @@ public class InternalEngine extends Engine {
|
|||
case CREATE_INDEX_AND_TRANSLOG:
|
||||
writer = createWriter(true);
|
||||
seqNoStats = new SeqNoStats(
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException(openMode.toString());
|
||||
}
|
||||
logger.trace("recovered [{}]", seqNoStats);
|
||||
seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats);
|
||||
seqNoService = seqNoServiceSupplier.apply(engineConfig, seqNoStats);
|
||||
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
|
||||
historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID());
|
||||
Objects.requireNonNull(historyUUID, "history uuid should not be null");
|
||||
indexWriter = writer;
|
||||
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint());
|
||||
translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService.getGlobalCheckpoint());
|
||||
assert translog.getGeneration() != null;
|
||||
this.translog = translog;
|
||||
updateWriterOnOpen();
|
||||
|
@ -240,12 +246,12 @@ public class InternalEngine extends Engine {
|
|||
public void restoreLocalCheckpointFromTranslog() throws IOException {
|
||||
try (ReleasableLock ignored = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
final long localCheckpoint = seqNoService().getLocalCheckpoint();
|
||||
final long localCheckpoint = seqNoService.getLocalCheckpoint();
|
||||
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFrom(localCheckpoint + 1)) {
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
if (operation.seqNo() > localCheckpoint) {
|
||||
seqNoService().markSeqNoAsCompleted(operation.seqNo());
|
||||
seqNoService.markSeqNoAsCompleted(operation.seqNo());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -256,17 +262,17 @@ public class InternalEngine extends Engine {
|
|||
public int fillSeqNoGaps(long primaryTerm) throws IOException {
|
||||
try (ReleasableLock ignored = writeLock.acquire()) {
|
||||
ensureOpen();
|
||||
final long localCheckpoint = seqNoService().getLocalCheckpoint();
|
||||
final long maxSeqNo = seqNoService().getMaxSeqNo();
|
||||
final long localCheckpoint = seqNoService.getLocalCheckpoint();
|
||||
final long maxSeqNo = seqNoService.getMaxSeqNo();
|
||||
int numNoOpsAdded = 0;
|
||||
for (
|
||||
long seqNo = localCheckpoint + 1;
|
||||
seqNo <= maxSeqNo;
|
||||
seqNo = seqNoService().getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
|
||||
seqNo = seqNoService.getLocalCheckpoint() + 1 /* the local checkpoint might have advanced so we leap-frog */) {
|
||||
innerNoOp(new NoOp(seqNo, primaryTerm, Operation.Origin.PRIMARY, System.nanoTime(), "filling gaps"));
|
||||
numNoOpsAdded++;
|
||||
assert seqNo <= seqNoService().getLocalCheckpoint()
|
||||
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService().getLocalCheckpoint() + "]";
|
||||
assert seqNo <= seqNoService.getLocalCheckpoint()
|
||||
: "local checkpoint did not advance; was [" + seqNo + "], now [" + seqNoService.getLocalCheckpoint() + "]";
|
||||
|
||||
}
|
||||
return numNoOpsAdded;
|
||||
|
@ -284,15 +290,13 @@ public class InternalEngine extends Engine {
|
|||
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
|
||||
}
|
||||
|
||||
private static SequenceNumbersService sequenceNumberService(
|
||||
final ShardId shardId,
|
||||
final String allocationId,
|
||||
final IndexSettings indexSettings,
|
||||
static SequenceNumbersService sequenceNumberService(
|
||||
final EngineConfig engineConfig,
|
||||
final SeqNoStats seqNoStats) {
|
||||
return new SequenceNumbersService(
|
||||
shardId,
|
||||
allocationId,
|
||||
indexSettings,
|
||||
engineConfig.getShardId(),
|
||||
engineConfig.getAllocationId(),
|
||||
engineConfig.getIndexSettings(),
|
||||
seqNoStats.getMaxSeqNo(),
|
||||
seqNoStats.getLocalCheckpoint(),
|
||||
seqNoStats.getGlobalCheckpoint());
|
||||
|
@ -621,8 +625,7 @@ public class InternalEngine extends Engine {
|
|||
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "old op recovering but it already has a seq no.;" +
|
||||
" index version: " + engineConfig.getIndexSettings().getIndexVersionCreated() + ", seqNo: " + seqNo;
|
||||
} else if (origin == Operation.Origin.PRIMARY) {
|
||||
// sequence number should not be set when operation origin is primary
|
||||
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO : "primary ops should never have an assigned seq no.; seqNo: " + seqNo;
|
||||
assert assertOriginPrimarySequenceNumber(seqNo);
|
||||
} else if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1)) {
|
||||
// sequence number should be set when operation origin is not primary
|
||||
assert seqNo >= 0 : "recovery or replica ops should have an assigned seq no.; origin: " + origin;
|
||||
|
@ -630,6 +633,13 @@ public class InternalEngine extends Engine {
|
|||
return true;
|
||||
}
|
||||
|
||||
protected boolean assertOriginPrimarySequenceNumber(final long seqNo) {
|
||||
// sequence number should not be set when operation origin is primary
|
||||
assert seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO
|
||||
: "primary operations must never have an assigned sequence number but was [" + seqNo + "]";
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean assertSequenceNumberBeforeIndexing(final Engine.Operation.Origin origin, final long seqNo) {
|
||||
if (engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_0_0_alpha1) ||
|
||||
origin == Operation.Origin.PRIMARY) {
|
||||
|
@ -639,6 +649,20 @@ public class InternalEngine extends Engine {
|
|||
return true;
|
||||
}
|
||||
|
||||
private long generateSeqNoForOperation(final Operation operation) {
|
||||
assert operation.origin() == Operation.Origin.PRIMARY;
|
||||
return doGenerateSeqNoForOperation(operation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the sequence number for the specified operation.
|
||||
*
|
||||
* @param operation the operation
|
||||
* @return the sequence number
|
||||
*/
|
||||
protected long doGenerateSeqNoForOperation(final Operation operation) {
|
||||
return seqNoService.generateSeqNo();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexResult index(Index index) throws IOException {
|
||||
|
@ -708,7 +732,7 @@ public class InternalEngine extends Engine {
|
|||
indexResult.setTranslogLocation(location);
|
||||
}
|
||||
if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
seqNoService().markSeqNoAsCompleted(indexResult.getSeqNo());
|
||||
seqNoService.markSeqNoAsCompleted(indexResult.getSeqNo());
|
||||
}
|
||||
indexResult.setTook(System.nanoTime() - index.startTime());
|
||||
indexResult.freeze();
|
||||
|
@ -728,14 +752,12 @@ public class InternalEngine extends Engine {
|
|||
final IndexingStrategy plan;
|
||||
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
|
||||
// no need to deal with out of order delivery - we never saw this one
|
||||
assert index.version() == 1L :
|
||||
"can optimize on replicas but incoming version is [" + index.version() + "]";
|
||||
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
|
||||
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
|
||||
} else {
|
||||
// drop out of order operations
|
||||
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
|
||||
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
|
||||
+ index.versionType() + "]";
|
||||
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
|
||||
// unlike the primary, replicas don't really care to about creation status of documents
|
||||
// this allows to ignore the case where a document was found in the live version maps in
|
||||
// a delete state and return false for the created flag in favor of code simplicity
|
||||
|
@ -770,15 +792,14 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
|
||||
assert index.origin() == Operation.Origin.PRIMARY :
|
||||
"planing as primary but origin isn't. got " + index.origin();
|
||||
assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin();
|
||||
final IndexingStrategy plan;
|
||||
// resolve an external operation into an internal one which is safe to replay
|
||||
if (canOptimizeAddDocument(index)) {
|
||||
if (mayHaveBeenIndexedBefore(index)) {
|
||||
plan = IndexingStrategy.overrideExistingAsIfNotThere(seqNoService().generateSeqNo(), 1L);
|
||||
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
|
||||
} else {
|
||||
plan = IndexingStrategy.optimizedAppendOnly(seqNoService().generateSeqNo());
|
||||
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
|
||||
}
|
||||
} else {
|
||||
// resolves incoming version
|
||||
|
@ -799,7 +820,7 @@ public class InternalEngine extends Engine {
|
|||
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
|
||||
} else {
|
||||
plan = IndexingStrategy.processNormally(currentNotFoundOrDeleted,
|
||||
seqNoService().generateSeqNo(),
|
||||
generateSeqNoForOperation(index),
|
||||
index.versionType().updateVersion(currentVersion, index.version())
|
||||
);
|
||||
}
|
||||
|
@ -1008,7 +1029,7 @@ public class InternalEngine extends Engine {
|
|||
deleteResult.setTranslogLocation(location);
|
||||
}
|
||||
if (deleteResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
seqNoService().markSeqNoAsCompleted(deleteResult.getSeqNo());
|
||||
seqNoService.markSeqNoAsCompleted(deleteResult.getSeqNo());
|
||||
}
|
||||
deleteResult.setTook(System.nanoTime() - delete.startTime());
|
||||
deleteResult.freeze();
|
||||
|
@ -1025,8 +1046,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
|
||||
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got "
|
||||
+ delete.origin();
|
||||
assert delete.origin() != Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
|
||||
// drop out of order operations
|
||||
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
|
||||
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
|
||||
|
@ -1064,8 +1084,7 @@ public class InternalEngine extends Engine {
|
|||
}
|
||||
|
||||
private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException {
|
||||
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got "
|
||||
+ delete.origin();
|
||||
assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin();
|
||||
// resolve operation from external to internal
|
||||
final VersionValue versionValue = resolveDocVersion(delete);
|
||||
assert incrementVersionLookup();
|
||||
|
@ -1083,9 +1102,10 @@ public class InternalEngine extends Engine {
|
|||
final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted);
|
||||
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
|
||||
} else {
|
||||
plan = DeletionStrategy.processNormally(currentlyDeleted,
|
||||
seqNoService().generateSeqNo(),
|
||||
delete.versionType().updateVersion(currentVersion, delete.version()));
|
||||
plan = DeletionStrategy.processNormally(
|
||||
currentlyDeleted,
|
||||
generateSeqNoForOperation(delete),
|
||||
delete.versionType().updateVersion(currentVersion, delete.version()));
|
||||
}
|
||||
return plan;
|
||||
}
|
||||
|
@ -1186,7 +1206,7 @@ public class InternalEngine extends Engine {
|
|||
return noOpResult;
|
||||
} finally {
|
||||
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
seqNoService().markSeqNoAsCompleted(seqNo);
|
||||
seqNoService.markSeqNoAsCompleted(seqNo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1921,7 +1941,7 @@ public class InternalEngine extends Engine {
|
|||
protected void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
|
||||
ensureCanFlush();
|
||||
try {
|
||||
final long localCheckpoint = seqNoService().getLocalCheckpoint();
|
||||
final long localCheckpoint = seqNoService.getLocalCheckpoint();
|
||||
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
|
||||
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
|
||||
final String translogUUID = translogGeneration.translogUUID;
|
||||
|
@ -1944,7 +1964,7 @@ public class InternalEngine extends Engine {
|
|||
if (syncId != null) {
|
||||
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
|
||||
}
|
||||
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo()));
|
||||
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService.getMaxSeqNo()));
|
||||
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
|
||||
commitData.put(HISTORY_UUID_KEY, historyUUID);
|
||||
logger.trace("committing writer with commit data [{}]", commitData);
|
||||
|
@ -2008,8 +2028,7 @@ public class InternalEngine extends Engine {
|
|||
return mergeScheduler.stats();
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequenceNumbersService seqNoService() {
|
||||
public final SequenceNumbersService seqNoService() {
|
||||
return seqNoService;
|
||||
}
|
||||
|
||||
|
|
|
@ -66,7 +66,7 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
|
|||
*
|
||||
* @return the next assigned sequence number
|
||||
*/
|
||||
public long generateSeqNo() {
|
||||
public final long generateSeqNo() {
|
||||
return localCheckpointTracker.generateSeqNo();
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.logging.log4j.core.LogEvent;
|
|||
import org.apache.logging.log4j.core.appender.AbstractAppender;
|
||||
import org.apache.logging.log4j.core.filter.RegexFilter;
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
||||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
|
||||
import org.apache.lucene.document.Field;
|
||||
|
@ -95,46 +94,36 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.analysis.AnalyzerScope;
|
||||
import org.elasticsearch.index.analysis.IndexAnalyzers;
|
||||
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
||||
import org.elasticsearch.index.codec.CodecService;
|
||||
import org.elasticsearch.index.engine.Engine.Searcher;
|
||||
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
|
||||
import org.elasticsearch.index.mapper.ContentPath;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.DocumentMapperForType;
|
||||
import org.elasticsearch.index.mapper.IdFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.MetadataFieldMapper;
|
||||
import org.elasticsearch.index.mapper.ParseContext;
|
||||
import org.elasticsearch.index.mapper.ParseContext.Document;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.RootObjectMapper;
|
||||
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
|
||||
import org.elasticsearch.index.mapper.SourceFieldMapper;
|
||||
import org.elasticsearch.index.mapper.Uid;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbersService;
|
||||
import org.elasticsearch.index.shard.IndexSearcherWrapper;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardUtils;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
import org.elasticsearch.index.store.DirectoryUtils;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.TranslogConfig;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
|
@ -171,9 +160,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongSupplier;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.function.ToLongBiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
|
||||
|
@ -184,7 +173,6 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANS
|
|||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
|
||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
|
||||
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
|
||||
import static org.elasticsearch.index.mapper.SourceToParse.source;
|
||||
import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -356,11 +344,19 @@ public class InternalEngineTests extends ESTestCase {
|
|||
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
|
||||
}
|
||||
|
||||
protected InternalEngine createEngine(Store store, Path translogPath,
|
||||
Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
|
||||
protected InternalEngine createEngine(Store store,
|
||||
Path translogPath,
|
||||
BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
|
||||
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier);
|
||||
}
|
||||
|
||||
protected InternalEngine createEngine(Store store,
|
||||
Path translogPath,
|
||||
BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
|
||||
ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation) throws IOException {
|
||||
return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null);
|
||||
}
|
||||
|
||||
protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
|
||||
return createEngine(indexSettings, store, translogPath, mergePolicy, null);
|
||||
|
||||
|
@ -377,8 +373,19 @@ public class InternalEngineTests extends ESTestCase {
|
|||
Path translogPath,
|
||||
MergePolicy mergePolicy,
|
||||
@Nullable IndexWriterFactory indexWriterFactory,
|
||||
@Nullable Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
|
||||
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null);
|
||||
@Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
|
||||
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null);
|
||||
}
|
||||
|
||||
protected InternalEngine createEngine(
|
||||
IndexSettings indexSettings,
|
||||
Store store,
|
||||
Path translogPath,
|
||||
MergePolicy mergePolicy,
|
||||
@Nullable IndexWriterFactory indexWriterFactory,
|
||||
@Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
|
||||
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation) throws IOException {
|
||||
return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, null);
|
||||
}
|
||||
|
||||
protected InternalEngine createEngine(
|
||||
|
@ -387,10 +394,11 @@ public class InternalEngineTests extends ESTestCase {
|
|||
Path translogPath,
|
||||
MergePolicy mergePolicy,
|
||||
@Nullable IndexWriterFactory indexWriterFactory,
|
||||
@Nullable Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier,
|
||||
@Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
|
||||
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
|
||||
@Nullable Sort indexSort) throws IOException {
|
||||
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort);
|
||||
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config);
|
||||
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config);
|
||||
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
|
||||
internalEngine.recoverFromTranslog();
|
||||
}
|
||||
|
@ -404,21 +412,39 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory,
|
||||
@Nullable final Function<EngineConfig, SequenceNumbersService> sequenceNumbersServiceSupplier,
|
||||
@Nullable final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
|
||||
@Nullable final ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
|
||||
final EngineConfig config) {
|
||||
return new InternalEngine(config) {
|
||||
@Override
|
||||
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
|
||||
return (indexWriterFactory != null) ?
|
||||
indexWriterFactory.createWriter(directory, iwc) :
|
||||
super.createWriter(directory, iwc);
|
||||
}
|
||||
if (sequenceNumbersServiceSupplier == null) {
|
||||
return new InternalEngine(config) {
|
||||
@Override
|
||||
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
|
||||
return (indexWriterFactory != null) ?
|
||||
indexWriterFactory.createWriter(directory, iwc) :
|
||||
super.createWriter(directory, iwc);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequenceNumbersService seqNoService() {
|
||||
return (sequenceNumbersServiceSupplier != null) ? sequenceNumbersServiceSupplier.apply(config) : super.seqNoService();
|
||||
protected long doGenerateSeqNoForOperation(final Operation operation) {
|
||||
return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new InternalEngine(config, sequenceNumbersServiceSupplier) {
|
||||
@Override
|
||||
IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
|
||||
return (indexWriterFactory != null) ?
|
||||
indexWriterFactory.createWriter(directory, iwc) :
|
||||
super.createWriter(directory, iwc);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected long doGenerateSeqNoForOperation(final Operation operation) {
|
||||
return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
|
||||
|
@ -672,8 +698,8 @@ public class InternalEngineTests extends ESTestCase {
|
|||
public void testSegmentsWithIndexSort() throws Exception {
|
||||
Sort indexSort = new Sort(new SortedSetSortField("_type", false));
|
||||
try (Store store = createStore();
|
||||
Engine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
|
||||
null, null, indexSort)) {
|
||||
Engine engine =
|
||||
createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, null, indexSort)) {
|
||||
List<Segment> segments = engine.segments(true);
|
||||
assertThat(segments.isEmpty(), equalTo(true));
|
||||
|
||||
|
@ -729,13 +755,28 @@ public class InternalEngineTests extends ESTestCase {
|
|||
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
try (
|
||||
Store store = createStore();
|
||||
InternalEngine engine = createEngine(store, createTempDir(), (config) -> new SequenceNumbersService(
|
||||
config.getShardId(),
|
||||
config.getAllocationId(),
|
||||
config.getIndexSettings(),
|
||||
maxSeqNo.get(),
|
||||
localCheckpoint.get(),
|
||||
globalCheckpoint.get())
|
||||
InternalEngine engine = createEngine(store, createTempDir(), (config, seqNoStats) -> new SequenceNumbersService(
|
||||
config.getShardId(),
|
||||
config.getAllocationId(),
|
||||
config.getIndexSettings(),
|
||||
seqNoStats.getMaxSeqNo(),
|
||||
seqNoStats.getLocalCheckpoint(),
|
||||
seqNoStats.getGlobalCheckpoint()) {
|
||||
@Override
|
||||
public long getMaxSeqNo() {
|
||||
return maxSeqNo.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLocalCheckpoint() {
|
||||
return localCheckpoint.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getGlobalCheckpoint() {
|
||||
return globalCheckpoint.get();
|
||||
}
|
||||
}
|
||||
)) {
|
||||
CommitStats stats1 = engine.commitStats();
|
||||
assertThat(stats1.getGeneration(), greaterThan(0L));
|
||||
|
@ -902,20 +943,11 @@ public class InternalEngineTests extends ESTestCase {
|
|||
Store store = createStore();
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
try {
|
||||
initialEngine = createEngine(store, createTempDir(), (config) ->
|
||||
new SequenceNumbersService(
|
||||
config.getShardId(),
|
||||
config.getAllocationId(),
|
||||
config.getIndexSettings(),
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
@Override
|
||||
public long generateSeqNo() {
|
||||
return seqNos.get(counter.getAndIncrement());
|
||||
}
|
||||
}
|
||||
);
|
||||
initialEngine = createEngine(
|
||||
store,
|
||||
createTempDir(),
|
||||
InternalEngine::sequenceNumberService,
|
||||
(engine, operation) -> seqNos.get(counter.getAndIncrement()));
|
||||
for (int i = 0; i < docs; i++) {
|
||||
final String id = Integer.toString(i);
|
||||
final ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
|
||||
|
@ -2711,7 +2743,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
|
||||
assertVisibleCount(engine, numDocs, false);
|
||||
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
|
||||
assertEquals(numDocs, parser.appliedOperations.get());
|
||||
assertEquals(numDocs, parser.appliedOperations());
|
||||
if (parser.mappingUpdate != null) {
|
||||
assertEquals(1, parser.getRecoveredTypes().size());
|
||||
assertTrue(parser.getRecoveredTypes().containsKey("test"));
|
||||
|
@ -2723,7 +2755,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
engine = createEngine(store, primaryTranslogDir);
|
||||
assertVisibleCount(engine, numDocs, false);
|
||||
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
|
||||
assertEquals(0, parser.appliedOperations.get());
|
||||
assertEquals(0, parser.appliedOperations());
|
||||
|
||||
final boolean flush = randomBoolean();
|
||||
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
|
||||
|
@ -2753,7 +2785,7 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertThat(topDocs.totalHits, equalTo(numDocs + 1L));
|
||||
}
|
||||
parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
|
||||
assertEquals(flush ? 1 : 2, parser.appliedOperations.get());
|
||||
assertEquals(flush ? 1 : 2, parser.appliedOperations());
|
||||
engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc)));
|
||||
if (randomBoolean()) {
|
||||
engine.refresh("test");
|
||||
|
@ -2767,97 +2799,6 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static class TranslogHandler implements EngineConfig.TranslogRecoveryRunner {
|
||||
|
||||
private final MapperService mapperService;
|
||||
public Mapping mappingUpdate = null;
|
||||
private final Map<String, Mapping> recoveredTypes = new HashMap<>();
|
||||
private final AtomicLong appliedOperations = new AtomicLong();
|
||||
|
||||
public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) {
|
||||
NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer());
|
||||
IndexAnalyzers indexAnalyzers = new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, Collections.emptyMap(), Collections.emptyMap());
|
||||
SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap());
|
||||
MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
|
||||
mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
|
||||
() -> null);
|
||||
}
|
||||
|
||||
private DocumentMapperForType docMapper(String type) {
|
||||
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type);
|
||||
DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService);
|
||||
return new DocumentMapperForType(b.build(mapperService), mappingUpdate);
|
||||
}
|
||||
|
||||
private void applyOperation(Engine engine, Engine.Operation operation) throws IOException {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
Engine.Index engineIndex = (Engine.Index) operation;
|
||||
Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate();
|
||||
if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) {
|
||||
recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false));
|
||||
}
|
||||
engine.index(engineIndex);
|
||||
break;
|
||||
case DELETE:
|
||||
engine.delete((Engine.Delete) operation);
|
||||
break;
|
||||
case NO_OP:
|
||||
engine.noOp((Engine.NoOp) operation);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("No operation defined for [" + operation + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the recovered types modifying the mapping during the recovery
|
||||
*/
|
||||
public Map<String, Mapping> getRecoveredTypes() {
|
||||
return recoveredTypes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
|
||||
int opsRecovered = 0;
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY));
|
||||
opsRecovered++;
|
||||
appliedOperations.incrementAndGet();
|
||||
}
|
||||
return opsRecovered;
|
||||
}
|
||||
|
||||
private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
|
||||
switch (operation.opType()) {
|
||||
case INDEX:
|
||||
final Translog.Index index = (Translog.Index) operation;
|
||||
final String indexName = mapperService.index().getName();
|
||||
final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
|
||||
mapperService.getIndexSettings().getIndexVersionCreated(),
|
||||
source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
|
||||
.routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
|
||||
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin,
|
||||
index.getAutoGeneratedIdTimestamp(), true);
|
||||
return engineIndex;
|
||||
case DELETE:
|
||||
final Translog.Delete delete = (Translog.Delete) operation;
|
||||
final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
|
||||
delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
|
||||
origin, System.nanoTime());
|
||||
return engineDelete;
|
||||
case NO_OP:
|
||||
final Translog.NoOp noOp = (Translog.NoOp) operation;
|
||||
final Engine.NoOp engineNoOp =
|
||||
new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason());
|
||||
return engineNoOp;
|
||||
default:
|
||||
throw new IllegalStateException("No operation defined for [" + operation + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testRecoverFromForeignTranslog() throws IOException {
|
||||
final int numDocs = randomIntBetween(1, 10);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
|
@ -3786,47 +3727,38 @@ public class InternalEngineTests extends ESTestCase {
|
|||
}
|
||||
|
||||
/**
|
||||
* A sequence number service that will generate a sequence number and if {@code stall} is set to {@code true} will wait on the barrier
|
||||
* and the referenced latch before returning. If the local checkpoint should advance (because {@code stall} is {@code false}), then the
|
||||
* value of {@code expectedLocalCheckpoint} is set accordingly.
|
||||
* A sequence number generator that will generate a sequence number and if {@code stall} is set to true will wait on the barrier and the
|
||||
* referenced latch before returning. If the local checkpoint should advance (because {@code stall} is false, then the value of
|
||||
* {@code expectedLocalCheckpoint} is set accordingly.
|
||||
*
|
||||
* @param latchReference to latch the thread for the purpose of stalling
|
||||
* @param barrier to signal the thread has generated a new sequence number
|
||||
* @param stall whether or not the thread should stall
|
||||
* @param expectedLocalCheckpoint the expected local checkpoint after generating a new sequence
|
||||
* number
|
||||
* @return a sequence number service
|
||||
* @return a sequence number generator
|
||||
*/
|
||||
private SequenceNumbersService getStallingSeqNoService(
|
||||
private ToLongBiFunction<Engine, Engine.Operation> getStallingSeqNoGenerator(
|
||||
final AtomicReference<CountDownLatch> latchReference,
|
||||
final CyclicBarrier barrier,
|
||||
final AtomicBoolean stall,
|
||||
final AtomicLong expectedLocalCheckpoint) {
|
||||
return new SequenceNumbersService(
|
||||
shardId,
|
||||
allocationId.getId(),
|
||||
defaultSettings,
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
@Override
|
||||
public long generateSeqNo() {
|
||||
final long seqNo = super.generateSeqNo();
|
||||
final CountDownLatch latch = latchReference.get();
|
||||
if (stall.get()) {
|
||||
try {
|
||||
barrier.await();
|
||||
latch.await();
|
||||
} catch (BrokenBarrierException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
if (expectedLocalCheckpoint.get() + 1 == seqNo) {
|
||||
expectedLocalCheckpoint.set(seqNo);
|
||||
}
|
||||
return (engine, operation) -> {
|
||||
final long seqNo = engine.seqNoService().generateSeqNo();
|
||||
final CountDownLatch latch = latchReference.get();
|
||||
if (stall.get()) {
|
||||
try {
|
||||
barrier.await();
|
||||
latch.await();
|
||||
} catch (BrokenBarrierException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
if (expectedLocalCheckpoint.get() + 1 == seqNo) {
|
||||
expectedLocalCheckpoint.set(seqNo);
|
||||
}
|
||||
return seqNo;
|
||||
}
|
||||
return seqNo;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -3840,8 +3772,8 @@ public class InternalEngineTests extends ESTestCase {
|
|||
final AtomicBoolean stall = new AtomicBoolean();
|
||||
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
final List<Thread> threads = new ArrayList<>();
|
||||
final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
|
||||
initialEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService);
|
||||
initialEngine =
|
||||
createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint));
|
||||
final InternalEngine finalInitialEngine = initialEngine;
|
||||
for (int i = 0; i < docs; i++) {
|
||||
final String id = Integer.toString(i);
|
||||
|
@ -4015,17 +3947,17 @@ public class InternalEngineTests extends ESTestCase {
|
|||
final int localCheckpoint = randomIntBetween(0, maxSeqNo);
|
||||
final int globalCheckpoint = randomIntBetween(0, localCheckpoint);
|
||||
try {
|
||||
final SequenceNumbersService seqNoService =
|
||||
new SequenceNumbersService(shardId, allocationId.getId(), defaultSettings, maxSeqNo, localCheckpoint, globalCheckpoint) {
|
||||
@Override
|
||||
public long generateSeqNo() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
|
||||
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> supplier = (engineConfig, ignored) -> new SequenceNumbersService(
|
||||
engineConfig.getShardId(),
|
||||
engineConfig.getAllocationId(),
|
||||
engineConfig.getIndexSettings(),
|
||||
maxSeqNo,
|
||||
localCheckpoint,
|
||||
globalCheckpoint);
|
||||
noOpEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier) {
|
||||
@Override
|
||||
public SequenceNumbersService seqNoService() {
|
||||
return seqNoService;
|
||||
protected long doGenerateSeqNoForOperation(Operation operation) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
noOpEngine.recoverFromTranslog();
|
||||
|
@ -4070,8 +4002,8 @@ public class InternalEngineTests extends ESTestCase {
|
|||
final AtomicBoolean stall = new AtomicBoolean();
|
||||
final AtomicLong expectedLocalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
|
||||
final Map<Thread, CountDownLatch> threads = new LinkedHashMap<>();
|
||||
final SequenceNumbersService seqNoService = getStallingSeqNoService(latchReference, barrier, stall, expectedLocalCheckpoint);
|
||||
actualEngine = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, (config) -> seqNoService);
|
||||
actualEngine =
|
||||
createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, InternalEngine::sequenceNumberService, getStallingSeqNoGenerator(latchReference, barrier, stall, expectedLocalCheckpoint));
|
||||
final InternalEngine finalActualEngine = actualEngine;
|
||||
final Translog translog = finalActualEngine.getTranslog();
|
||||
final long generation = finalActualEngine.getTranslog().currentFileGeneration();
|
||||
|
@ -4160,26 +4092,20 @@ public class InternalEngineTests extends ESTestCase {
|
|||
InternalEngine actualEngine = null;
|
||||
try {
|
||||
final Set<Long> completedSeqNos = new HashSet<>();
|
||||
final SequenceNumbersService seqNoService =
|
||||
new SequenceNumbersService(
|
||||
shardId,
|
||||
allocationId.getId(),
|
||||
defaultSettings,
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> supplier = (engineConfig, seqNoStats) -> new SequenceNumbersService(
|
||||
engineConfig.getShardId(),
|
||||
engineConfig.getAllocationId(),
|
||||
engineConfig.getIndexSettings(),
|
||||
seqNoStats.getMaxSeqNo(),
|
||||
seqNoStats.getLocalCheckpoint(),
|
||||
seqNoStats.getGlobalCheckpoint()) {
|
||||
@Override
|
||||
public void markSeqNoAsCompleted(long seqNo) {
|
||||
super.markSeqNoAsCompleted(seqNo);
|
||||
completedSeqNos.add(seqNo);
|
||||
}
|
||||
};
|
||||
actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG)) {
|
||||
@Override
|
||||
public SequenceNumbersService seqNoService() {
|
||||
return seqNoService;
|
||||
}
|
||||
};
|
||||
actualEngine = new InternalEngine(copy(engine.config(), EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG), supplier);
|
||||
final int operations = randomIntBetween(0, 1024);
|
||||
final Set<Long> expectedCompletedSeqNos = new HashSet<>();
|
||||
for (int i = 0; i < operations; i++) {
|
||||
|
@ -4347,4 +4273,71 @@ public class InternalEngineTests extends ESTestCase {
|
|||
assertSameReader(getSearcher, searchSearcher);
|
||||
}
|
||||
}
|
||||
|
||||
public void testSeqNoGenerator() throws IOException {
|
||||
engine.close();
|
||||
final long seqNo = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE);
|
||||
final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> seqNoService = (config, seqNoStats) -> new SequenceNumbersService(
|
||||
config.getShardId(),
|
||||
config.getAllocationId(),
|
||||
config.getIndexSettings(),
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.NO_OPS_PERFORMED,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO);
|
||||
final AtomicLong seqNoGenerator = new AtomicLong(seqNo);
|
||||
try (Engine e = createEngine(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null, seqNoService, (engine, operation) -> seqNoGenerator.getAndIncrement())) {
|
||||
final String id = "id";
|
||||
final Field uidField = new Field("_id", id, IdFieldMapper.Defaults.FIELD_TYPE);
|
||||
final String type = "type";
|
||||
final Field versionField = new NumericDocValuesField("_version", 0);
|
||||
final SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
|
||||
final ParseContext.Document document = new ParseContext.Document();
|
||||
document.add(uidField);
|
||||
document.add(versionField);
|
||||
document.add(seqID.seqNo);
|
||||
document.add(seqID.seqNoDocValue);
|
||||
document.add(seqID.primaryTerm);
|
||||
final BytesReference source = new BytesArray(new byte[]{1});
|
||||
final ParsedDocument parsedDocument = new ParsedDocument(
|
||||
versionField,
|
||||
seqID,
|
||||
id,
|
||||
type,
|
||||
"routing",
|
||||
Collections.singletonList(document),
|
||||
source,
|
||||
XContentType.JSON,
|
||||
null);
|
||||
|
||||
final Engine.Index index = new Engine.Index(
|
||||
new Term("_id", parsedDocument.id()),
|
||||
parsedDocument,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
(long) randomIntBetween(1, 8),
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
System.currentTimeMillis(),
|
||||
System.currentTimeMillis(),
|
||||
randomBoolean());
|
||||
final Engine.IndexResult indexResult = e.index(index);
|
||||
assertThat(indexResult.getSeqNo(), equalTo(seqNo));
|
||||
assertThat(seqNoGenerator.get(), equalTo(seqNo + 1));
|
||||
|
||||
final Engine.Delete delete = new Engine.Delete(
|
||||
type,
|
||||
id,
|
||||
new Term("_id", parsedDocument.id()),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
(long) randomIntBetween(1, 8),
|
||||
Versions.MATCH_ANY,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
System.currentTimeMillis());
|
||||
final Engine.DeleteResult deleteResult = e.delete(delete);
|
||||
assertThat(deleteResult.getSeqNo(), equalTo(seqNo + 1));
|
||||
assertThat(seqNoGenerator.get(), equalTo(seqNo + 2));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -316,7 +316,7 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
|
|||
assert documentFailureMessage != null;
|
||||
throw new IOException(documentFailureMessage);
|
||||
}
|
||||
}, null, config);
|
||||
}, null, null, config);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -638,6 +638,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
|
|||
}
|
||||
},
|
||||
null,
|
||||
null,
|
||||
config);
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ in the parent multi-bucket aggregation. The specified metric must be numeric and
|
|||
If the script language is `expression` then a numeric return value is permitted. In this case 0.0 will be evaluated as `false`
|
||||
and all other values will evaluate to true.
|
||||
|
||||
Note: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that
|
||||
NOTE: The bucket_selector aggregation, like all pipeline aggregations, executions after all other sibling aggregations. This means that
|
||||
using the bucket_selector aggregation to filter the returned buckets in the response does not save on execution time running the aggregations.
|
||||
|
||||
==== Syntax
|
||||
|
|
|
@ -13,7 +13,7 @@ Here are a few sample use-cases that Elasticsearch could be used for:
|
|||
* You run a price alerting platform which allows price-savvy customers to specify a rule like "I am interested in buying a specific electronic gadget and I want to be notified if the price of gadget falls below $X from any vendor within the next month". In this case you can scrape vendor prices, push them into Elasticsearch and use its reverse-search (Percolator) capability to match price movements against customer queries and eventually push the alerts out to the customer once matches are found.
|
||||
* You have analytics/business-intelligence needs and want to quickly investigate, analyze, visualize, and ask ad-hoc questions on a lot of data (think millions or billions of records). In this case, you can use Elasticsearch to store your data and then use Kibana (part of the Elasticsearch/Logstash/Kibana stack) to build custom dashboards that can visualize aspects of your data that are important to you. Additionally, you can use the Elasticsearch aggregations functionality to perform complex business intelligence queries against your data.
|
||||
|
||||
For the rest of this tutorial, I will guide you through the process of getting Elasticsearch up and running, taking a peek inside it, and performing basic operations like indexing, searching, and modifying your data. At the end of this tutorial, you should have a good idea of what Elasticsearch is, how it works, and hopefully be inspired to see how you can use it to either build sophisticated search applications or to mine intelligence from your data.
|
||||
For the rest of this tutorial, you will be guided through the process of getting Elasticsearch up and running, taking a peek inside it, and performing basic operations like indexing, searching, and modifying your data. At the end of this tutorial, you should have a good idea of what Elasticsearch is, how it works, and hopefully be inspired to see how you can use it to either build sophisticated search applications or to mine intelligence from your data.
|
||||
--
|
||||
|
||||
== Basic Concepts
|
||||
|
@ -660,7 +660,7 @@ Now that we've gotten a glimpse of the basics, let's try to work on a more reali
|
|||
--------------------------------------------------
|
||||
// NOTCONSOLE
|
||||
|
||||
For the curious, I generated this data from http://www.json-generator.com/[`www.json-generator.com/`] so please ignore the actual values and semantics of the data as these are all randomly generated.
|
||||
For the curious, this data was generated using http://www.json-generator.com/[`www.json-generator.com/`], so please ignore the actual values and semantics of the data as these are all randomly generated.
|
||||
|
||||
[float]
|
||||
=== Loading the Sample Dataset
|
||||
|
@ -1284,4 +1284,4 @@ There are many other aggregations capabilities that we won't go into detail here
|
|||
|
||||
== Conclusion
|
||||
|
||||
Elasticsearch is both a simple and complex product. We've so far learned the basics of what it is, how to look inside of it, and how to work with it using some of the REST APIs. I hope that this tutorial has given you a better understanding of what Elasticsearch is and more importantly, inspired you to further experiment with the rest of its great features!
|
||||
Elasticsearch is both a simple and complex product. We've so far learned the basics of what it is, how to look inside of it, and how to work with it using some of the REST APIs. Hopefully this tutorial has given you a better understanding of what Elasticsearch is and more importantly, inspired you to further experiment with the rest of its great features!
|
||||
|
|
|
@ -76,7 +76,7 @@ GET /_search
|
|||
"common": {
|
||||
"body": {
|
||||
"query": "this is bonsai cool",
|
||||
"cutoff_frequency": 0.001
|
||||
"cutoff_frequency": 0.001
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -100,8 +100,8 @@ GET /_search
|
|||
"common": {
|
||||
"body": {
|
||||
"query": "nelly the elephant as a cartoon",
|
||||
"cutoff_frequency": 0.001,
|
||||
"low_freq_operator": "and"
|
||||
"cutoff_frequency": 0.001,
|
||||
"low_freq_operator": "and"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -200,11 +200,11 @@ GET /_search
|
|||
"common": {
|
||||
"body": {
|
||||
"query": "nelly the elephant not as a cartoon",
|
||||
"cutoff_frequency": 0.001,
|
||||
"minimum_should_match": {
|
||||
"low_freq" : 2,
|
||||
"high_freq" : 3
|
||||
}
|
||||
"cutoff_frequency": 0.001,
|
||||
"minimum_should_match": {
|
||||
"low_freq" : 2,
|
||||
"high_freq" : 3
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -261,11 +261,11 @@ GET /_search
|
|||
"common": {
|
||||
"body": {
|
||||
"query": "how not to be",
|
||||
"cutoff_frequency": 0.001,
|
||||
"minimum_should_match": {
|
||||
"low_freq" : 2,
|
||||
"high_freq" : 3
|
||||
}
|
||||
"cutoff_frequency": 0.001,
|
||||
"minimum_should_match": {
|
||||
"low_freq" : 2,
|
||||
"high_freq" : 3
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -58,6 +58,8 @@ GET /_search
|
|||
|
||||
<1> The `subject` field is three times as important as the `message` field.
|
||||
|
||||
WARNING: There is a limit of no more than 1024 fields being queried at once.
|
||||
|
||||
[[multi-match-types]]
|
||||
[float]
|
||||
==== Types of `multi_match` query:
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
[[query-dsl-span-not-query]]
|
||||
=== Span Not Query
|
||||
|
||||
Removes matches which overlap with another span query. The span not
|
||||
Removes matches which overlap with another span query or which are
|
||||
within x tokens before (controlled by the parameter `pre`) or y tokens
|
||||
after (controled by the parameter `post`) another SpanQuery. The span not
|
||||
query maps to Lucene `SpanNotQuery`. Here is an example:
|
||||
|
||||
[source,js]
|
||||
|
@ -39,7 +41,7 @@ In the above example all documents with the term hoya are filtered except the on
|
|||
Other top level options:
|
||||
|
||||
[horizontal]
|
||||
`pre`:: If set the amount of tokens before the include span can't have overlap with the exclude span.
|
||||
`post`:: If set the amount of tokens after the include span can't have overlap with the exclude span.
|
||||
`pre`:: If set the amount of tokens before the include span can't have overlap with the exclude span. Defaults to 0.
|
||||
`post`:: If set the amount of tokens after the include span can't have overlap with the exclude span. Defaults to 0.
|
||||
`dist`:: If set the amount of tokens from within the include span can't have overlap with the exclude span. Equivalent
|
||||
of setting both `pre` and `post`.
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.engine;
|
||||
|
||||
import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.analysis.AnalyzerScope;
|
||||
import org.elasticsearch.index.analysis.IndexAnalyzers;
|
||||
import org.elasticsearch.index.analysis.NamedAnalyzer;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.DocumentMapperForType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.Mapping;
|
||||
import org.elasticsearch.index.mapper.RootObjectMapper;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.mapper.MapperRegistry;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.index.mapper.SourceToParse.source;
|
||||
|
||||
public class TranslogHandler implements EngineConfig.TranslogRecoveryRunner {
|
||||
|
||||
private final MapperService mapperService;
|
||||
public Mapping mappingUpdate = null;
|
||||
private final Map<String, Mapping> recoveredTypes = new HashMap<>();
|
||||
|
||||
private final AtomicLong appliedOperations = new AtomicLong();
|
||||
|
||||
long appliedOperations() {
|
||||
return appliedOperations.get();
|
||||
}
|
||||
|
||||
public TranslogHandler(NamedXContentRegistry xContentRegistry, IndexSettings indexSettings) {
|
||||
NamedAnalyzer defaultAnalyzer = new NamedAnalyzer("default", AnalyzerScope.INDEX, new StandardAnalyzer());
|
||||
IndexAnalyzers indexAnalyzers =
|
||||
new IndexAnalyzers(indexSettings, defaultAnalyzer, defaultAnalyzer, defaultAnalyzer, emptyMap(), emptyMap());
|
||||
SimilarityService similarityService = new SimilarityService(indexSettings, null, emptyMap());
|
||||
MapperRegistry mapperRegistry = new IndicesModule(emptyList()).getMapperRegistry();
|
||||
mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
|
||||
() -> null);
|
||||
}
|
||||
|
||||
private DocumentMapperForType docMapper(String type) {
|
||||
RootObjectMapper.Builder rootBuilder = new RootObjectMapper.Builder(type);
|
||||
DocumentMapper.Builder b = new DocumentMapper.Builder(rootBuilder, mapperService);
|
||||
return new DocumentMapperForType(b.build(mapperService), mappingUpdate);
|
||||
}
|
||||
|
||||
private void applyOperation(Engine engine, Engine.Operation operation) throws IOException {
|
||||
switch (operation.operationType()) {
|
||||
case INDEX:
|
||||
Engine.Index engineIndex = (Engine.Index) operation;
|
||||
Mapping update = engineIndex.parsedDoc().dynamicMappingsUpdate();
|
||||
if (engineIndex.parsedDoc().dynamicMappingsUpdate() != null) {
|
||||
recoveredTypes.compute(engineIndex.type(), (k, mapping) -> mapping == null ? update : mapping.merge(update, false));
|
||||
}
|
||||
engine.index(engineIndex);
|
||||
break;
|
||||
case DELETE:
|
||||
engine.delete((Engine.Delete) operation);
|
||||
break;
|
||||
case NO_OP:
|
||||
engine.noOp((Engine.NoOp) operation);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("No operation defined for [" + operation + "]");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the recovered types modifying the mapping during the recovery
|
||||
*/
|
||||
public Map<String, Mapping> getRecoveredTypes() {
|
||||
return recoveredTypes;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(Engine engine, Translog.Snapshot snapshot) throws IOException {
|
||||
int opsRecovered = 0;
|
||||
Translog.Operation operation;
|
||||
while ((operation = snapshot.next()) != null) {
|
||||
applyOperation(engine, convertToEngineOp(operation, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY));
|
||||
opsRecovered++;
|
||||
appliedOperations.incrementAndGet();
|
||||
}
|
||||
return opsRecovered;
|
||||
}
|
||||
|
||||
private Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) {
|
||||
switch (operation.opType()) {
|
||||
case INDEX:
|
||||
final Translog.Index index = (Translog.Index) operation;
|
||||
final String indexName = mapperService.index().getName();
|
||||
final Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
|
||||
mapperService.getIndexSettings().getIndexVersionCreated(),
|
||||
source(indexName, index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
|
||||
.routing(index.routing()).parent(index.parent()), index.seqNo(), index.primaryTerm(),
|
||||
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin,
|
||||
index.getAutoGeneratedIdTimestamp(), true);
|
||||
return engineIndex;
|
||||
case DELETE:
|
||||
final Translog.Delete delete = (Translog.Delete) operation;
|
||||
final Engine.Delete engineDelete = new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(),
|
||||
delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(),
|
||||
origin, System.nanoTime());
|
||||
return engineDelete;
|
||||
case NO_OP:
|
||||
final Translog.NoOp noOp = (Translog.NoOp) operation;
|
||||
final Engine.NoOp engineNoOp =
|
||||
new Engine.NoOp(noOp.seqNo(), noOp.primaryTerm(), origin, System.nanoTime(), noOp.reason());
|
||||
return engineNoOp;
|
||||
default:
|
||||
throw new IllegalStateException("No operation defined for [" + operation + "]");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue