Add a new Engine implementation for replicas with segment replication enabled. (#3240)

* Change fastForwardProcessedSeqNo method in LocalCheckpointTracker to persisted checkpoint.

This change inverts fastForwardProcessedSeqNo to fastForwardPersistedSeqNo for use in
Segment Replication.  This is so that a Segrep Engine can match the logic of InternalEngine
where the seqNo is incremented with each operation, but only persisted in the tracker on a flush.
With Segment Replication we bump the processed number with each operation received index/delete/noOp, and
invoke this method when we receive a new set of segments to bump the persisted seqNo.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Extract Translog specific engine methods into an abstract class.

This change extracts translog specific methods to an abstract engine class so that other engine
implementations can reuse translog logic.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add a separate Engine implementation for replicas with segment replication enabled.

This change adds a new engine intended to be used on replicas with segment replication enabled.
This engine does not wire up an IndexWriter, but still writes all operations to a translog.
The engine uses a new ReaderManager that refreshes from an externally provided SegmentInfos.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix spotless checks.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix :server:compileInternalClusterTestJava compilation.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix failing test naming convention check.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR feedback.

- Removed isReadOnlyReplica from overloaded constructor and added feature flag checks.
- Updated log msg in NRTReplicationReaderManager
- cleaned up store ref counting in NRTReplicationEngine.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix spotless check.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove TranslogAwareEngine and build translog in NRTReplicationEngine.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Fix formatting

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add missing translog methods to NRTEngine.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Remove persistent seqNo check from fastForwardProcessedSeqNo.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* PR feedback.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add test specific to translog trimming.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Javadoc check.

Signed-off-by: Marc Handalian <handalm@amazon.com>

* Add failEngine calls to translog methods in NRTReplicationEngine.
Roll xlog generation on replica when a new commit point is received.

Signed-off-by: Marc Handalian <handalm@amazon.com>
This commit is contained in:
Marc Handalian 2022-05-24 10:18:18 -07:00 committed by GitHub
parent fd5a38de12
commit a0030dfb47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1008 additions and 71 deletions

View File

@ -169,6 +169,12 @@ public abstract class Engine implements Closeable {
protected abstract SegmentInfos getLastCommittedSegmentInfos();
/**
* Return the latest active SegmentInfos from the engine.
* @return {@link SegmentInfos}
*/
protected abstract SegmentInfos getLatestSegmentInfos();
public MergeStats getMergeStats() {
return new MergeStats();
}
@ -176,6 +182,17 @@ public abstract class Engine implements Closeable {
/** returns the history uuid for the engine */
public abstract String getHistoryUUID();
/**
* Reads the current stored history ID from commit data.
*/
String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
return uuid;
}
/** Returns how many bytes we are currently moving from heap to disk */
public abstract long getWritingBytes();

View File

@ -97,6 +97,7 @@ public final class EngineConfig {
private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier;
private final Supplier<RetentionLeases> retentionLeasesSupplier;
private final boolean isReadOnlyReplica;
/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
@ -228,6 +229,66 @@ public final class EngineConfig {
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier
) {
this(
shardId,
threadPool,
indexSettings,
warmer,
store,
mergePolicy,
analyzer,
similarity,
codecService,
eventListener,
queryCache,
queryCachingPolicy,
translogConfig,
translogDeletionPolicyFactory,
flushMergesAfter,
externalRefreshListener,
internalRefreshListener,
indexSort,
circuitBreakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier,
false
);
}
/**
* Creates a new {@link org.opensearch.index.engine.EngineConfig}
*/
EngineConfig(
ShardId shardId,
ThreadPool threadPool,
IndexSettings indexSettings,
Engine.Warmer warmer,
Store store,
MergePolicy mergePolicy,
Analyzer analyzer,
Similarity similarity,
CodecService codecService,
Engine.EventListener eventListener,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig,
TranslogDeletionPolicyFactory translogDeletionPolicyFactory,
TimeValue flushMergesAfter,
List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener,
Sort indexSort,
CircuitBreakerService circuitBreakerService,
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
) {
if (isReadOnlyReplica && indexSettings.isSegRepEnabled() == false) {
throw new IllegalArgumentException("Shard can only be wired as a read only replica with Segment Replication enabled");
}
this.shardId = shardId;
this.indexSettings = indexSettings;
this.threadPool = threadPool;
@ -266,6 +327,7 @@ public final class EngineConfig {
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier;
this.isReadOnlyReplica = isReadOnlyReplica;
}
/**
@ -460,6 +522,16 @@ public final class EngineConfig {
return primaryTermSupplier;
}
/**
* Returns if this replica should be wired as a read only.
* This is used for Segment Replication where the engine implementation used is dependent on
* if the shard is a primary/replica.
* @return true if this engine should be wired as read only.
*/
public boolean isReadOnlyReplica() {
return indexSettings.isSegRepEnabled() && isReadOnlyReplica;
}
/**
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.

View File

@ -146,7 +146,8 @@ public class EngineConfigFactory {
LongSupplier globalCheckpointSupplier,
Supplier<RetentionLeases> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier
EngineConfig.TombstoneDocSupplier tombstoneDocSupplier,
boolean isReadOnlyReplica
) {
CodecService codecServiceToUse = codecService;
if (codecService == null && this.codecServiceFactory != null) {
@ -176,7 +177,8 @@ public class EngineConfigFactory {
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTermSupplier,
tombstoneDocSupplier
tombstoneDocSupplier,
isReadOnlyReplica
);
}

View File

@ -49,6 +49,7 @@ import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
@ -648,17 +649,6 @@ public class InternalEngine extends Engine {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}
/**
* Reads the current stored history ID from the IW commit data.
*/
private String loadHistoryUUID(Map<String, String> commitData) {
final String uuid = commitData.get(HISTORY_UUID_KEY);
if (uuid == null) {
throw new IllegalStateException("commit doesn't contain history uuid");
}
return uuid;
}
private ExternalReaderManager createReaderManager(RefreshWarmerListener externalRefreshListener) throws EngineException {
boolean success = false;
OpenSearchReaderManager internalReaderManager = null;
@ -2298,6 +2288,23 @@ public class InternalEngine extends Engine {
return lastCommittedSegmentInfos;
}
@Override
public SegmentInfos getLatestSegmentInfos() {
OpenSearchDirectoryReader reader = null;
try {
reader = internalReaderManager.acquire();
return ((StandardDirectoryReader) reader.getDelegate()).getSegmentInfos();
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
} finally {
try {
internalReaderManager.release(reader);
} catch (IOException e) {
throw new EngineException(shardId, e.getMessage(), e);
}
}
}
@Override
protected final void writerSegmentStats(SegmentsStats stats) {
stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());

View File

@ -0,0 +1,482 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.seqno.LocalCheckpointTracker;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
import org.opensearch.index.translog.TranslogDeletionPolicy;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.search.suggest.completion.CompletionStats;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.stream.Stream;
/**
* This is an {@link Engine} implementation intended for replica shards when Segment Replication
* is enabled. This Engine does not create an IndexWriter, rather it refreshes a {@link NRTReplicationReaderManager}
* with new Segments when received from an external source.
*
* @opensearch.internal
*/
public class NRTReplicationEngine extends Engine {
private volatile SegmentInfos lastCommittedSegmentInfos;
private final NRTReplicationReaderManager readerManager;
private final CompletionStatsCache completionStatsCache;
private final LocalCheckpointTracker localCheckpointTracker;
private final Translog translog;
public NRTReplicationEngine(EngineConfig engineConfig) {
super(engineConfig);
store.incRef();
NRTReplicationReaderManager readerManager = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
);
this.localCheckpointTracker = new LocalCheckpointTracker(commitInfo.maxSeqNo, commitInfo.localCheckpoint);
this.completionStatsCache = new CompletionStatsCache(() -> acquireSearcher("completion_stats"));
this.readerManager = readerManager;
this.readerManager.addListener(completionStatsCache);
this.translog = openTranslog(
engineConfig,
getTranslogDeletionPolicy(engineConfig),
engineConfig.getGlobalCheckpointSupplier(),
localCheckpointTracker::markSeqNoAsPersisted
);
} catch (IOException e) {
IOUtils.closeWhileHandlingException(store::decRef, readerManager);
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}
}
public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
// Update the current infos reference on the Engine's reader.
readerManager.updateSegments(infos);
// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
// generation. We can still refresh with incoming SegmentInfos that are not part of a commit point.
if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) {
this.lastCommittedSegmentInfos = infos;
rollTranslogGeneration();
}
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}
@Override
public String getHistoryUUID() {
return loadHistoryUUID(lastCommittedSegmentInfos.userData);
}
@Override
public long getWritingBytes() {
return 0;
}
@Override
public CompletionStats completionStats(String... fieldNamePatterns) {
return completionStatsCache.get(fieldNamePatterns);
}
@Override
public long getIndexThrottleTimeInMillis() {
return 0;
}
@Override
public boolean isThrottled() {
return false;
}
@Override
public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimOperations(belowTerm, aboveSeqNo);
} catch (Exception e) {
try {
failEngine("translog operations trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog operations", e);
}
}
@Override
public IndexResult index(Index index) throws IOException {
ensureOpen();
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
final Translog.Location location = translog.add(new Translog.Index(index, indexResult));
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
localCheckpointTracker.advanceMaxSeqNo(index.seqNo());
return indexResult;
}
@Override
public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
final Translog.Location location = translog.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
localCheckpointTracker.advanceMaxSeqNo(delete.seqNo());
return deleteResult;
}
@Override
public NoOpResult noOp(NoOp noOp) throws IOException {
ensureOpen();
NoOpResult noOpResult = new NoOpResult(noOp.primaryTerm(), noOp.seqNo());
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
localCheckpointTracker.advanceMaxSeqNo(noOp.seqNo());
return noOpResult;
}
@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
}
@Override
protected ReferenceManager<OpenSearchDirectoryReader> getReferenceManager(SearcherScope scope) {
return readerManager;
}
@Override
public boolean isTranslogSyncNeeded() {
return translog.syncNeeded();
}
@Override
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) throws IOException {
boolean synced = translog.ensureSynced(locations);
if (synced) {
translog.trimUnreferencedReaders();
}
return synced;
}
@Override
public void syncTranslog() throws IOException {
translog.sync();
translog.trimUnreferencedReaders();
}
@Override
public Closeable acquireHistoryRetentionLock() {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
throw new UnsupportedOperationException("Not implemented");
}
@Override
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException {
return 0;
}
@Override
public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
return false;
}
@Override
public long getMinRetainedSeqNo() {
return localCheckpointTracker.getProcessedCheckpoint();
}
@Override
public TranslogStats getTranslogStats() {
return translog.stats();
}
@Override
public Translog.Location getTranslogLastWriteLocation() {
return translog.getLastWriteLocation();
}
@Override
public long getPersistedLocalCheckpoint() {
return localCheckpointTracker.getPersistedCheckpoint();
}
public long getProcessedLocalCheckpoint() {
return localCheckpointTracker.getProcessedCheckpoint();
}
@Override
public SeqNoStats getSeqNoStats(long globalCheckpoint) {
return localCheckpointTracker.getStats(globalCheckpoint);
}
@Override
public long getLastSyncedGlobalCheckpoint() {
return translog.getLastSyncedGlobalCheckpoint();
}
@Override
public long getIndexBufferRAMBytesUsed() {
return 0;
}
@Override
public List<Segment> segments(boolean verbose) {
return Arrays.asList(getSegmentInfo(getLatestSegmentInfos(), verbose));
}
@Override
public void refresh(String source) throws EngineException {}
@Override
public boolean maybeRefresh(String source) throws EngineException {
return false;
}
@Override
public void writeIndexingBuffer() throws EngineException {}
@Override
public boolean shouldPeriodicallyFlush() {
return false;
}
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {}
@Override
public void trimUnreferencedTranslogFiles() throws EngineException {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
translog.trimUnreferencedReaders();
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to trim translog", e);
}
}
@Override
public boolean shouldRollTranslogGeneration() {
return translog.shouldRollGeneration();
}
@Override
public void rollTranslogGeneration() throws EngineException {
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
translog.rollGeneration();
translog.trimUnreferencedReaders();
} catch (Exception e) {
try {
failEngine("translog trimming failed", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new EngineException(shardId, "failed to roll translog", e);
}
}
@Override
public void forceMerge(
boolean flush,
int maxNumSegments,
boolean onlyExpungeDeletes,
boolean upgrade,
boolean upgradeOnlyAncientSegments,
String forceMergeUUID
) throws EngineException, IOException {}
@Override
public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) throws EngineException {
try {
final IndexCommit indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, store.directory());
return new GatedCloseable<>(indexCommit, () -> {});
} catch (IOException e) {
throw new EngineException(shardId, "Unable to build latest IndexCommit", e);
}
}
@Override
public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineException {
return acquireLastIndexCommit(false);
}
@Override
public SafeCommitInfo getSafeCommitInfo() {
return new SafeCommitInfo(localCheckpointTracker.getProcessedCheckpoint(), lastCommittedSegmentInfos.totalMaxDoc());
}
@Override
protected final void closeNoLock(String reason, CountDownLatch closedLatch) {
if (isClosed.compareAndSet(false, true)) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread()
: "Either the write lock must be held or the engine must be currently be failing itself";
try {
IOUtils.close(readerManager, translog, store::decRef);
} catch (Exception e) {
logger.warn("failed to close engine", e);
} finally {
logger.debug("engine closed [{}]", reason);
closedLatch.countDown();
}
}
}
@Override
public void activateThrottling() {}
@Override
public void deactivateThrottling() {}
@Override
public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecoveryRunner) throws IOException {
return 0;
}
@Override
public int fillSeqNoGaps(long primaryTerm) throws IOException {
return 0;
}
@Override
public Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
throw new UnsupportedOperationException("Read only replicas do not have an IndexWriter and cannot recover from a translog.");
}
@Override
public void skipTranslogRecovery() {
// Do nothing.
}
@Override
public void maybePruneDeletes() {}
@Override
public void updateMaxUnsafeAutoIdTimestamp(long newTimestamp) {}
@Override
public long getMaxSeqNoOfUpdatesOrDeletes() {
return localCheckpointTracker.getMaxSeqNo();
}
@Override
public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) {}
public Translog getTranslog() {
return translog;
}
@Override
public void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps) {
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
translogDeletionPolicy.setRetentionAgeInMillis(translogRetentionAge.millis());
translogDeletionPolicy.setRetentionSizeInBytes(translogRetentionSize.getBytes());
}
@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
}
@Override
protected SegmentInfos getLatestSegmentInfos() {
return readerManager.getSegmentInfos();
}
protected LocalCheckpointTracker getLocalCheckpointTracker() {
return localCheckpointTracker;
}
private DirectoryReader getDirectoryReader() throws IOException {
// for segment replication: replicas should create the reader from store, we don't want an open IW on replicas.
return new SoftDeletesDirectoryReaderWrapper(DirectoryReader.open(store.directory()), Lucene.SOFT_DELETES_FIELD);
}
private Translog openTranslog(
EngineConfig engineConfig,
TranslogDeletionPolicy translogDeletionPolicy,
LongSupplier globalCheckpointSupplier,
LongConsumer persistedSequenceNumberConsumer
) throws IOException {
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final Map<String, String> userData = lastCommittedSegmentInfos.getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(
translogConfig,
translogUUID,
translogDeletionPolicy,
globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(),
persistedSequenceNumberConsumer
);
}
private TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConfig) {
TranslogDeletionPolicy customTranslogDeletionPolicy = null;
if (engineConfig.getCustomTranslogDeletionPolicyFactory() != null) {
customTranslogDeletionPolicy = engineConfig.getCustomTranslogDeletionPolicyFactory()
.create(engineConfig.getIndexSettings(), engineConfig.retentionLeasesSupplier());
}
return Objects.requireNonNullElseGet(
customTranslogDeletionPolicy,
() -> new DefaultTranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
)
);
}
}

View File

@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.engine;
/**
* Engine Factory implementation used with Segment Replication that wires up replica shards with an ${@link NRTReplicationEngine}
* and primary with an ${@link InternalEngine}
*
* @opensearch.internal
*/
public class NRTReplicationEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(EngineConfig config) {
if (config.isReadOnlyReplica()) {
return new NRTReplicationEngine(config);
}
return new InternalEngine(config);
}
}

View File

@ -0,0 +1,92 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.engine;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
import org.apache.lucene.index.StandardDirectoryReader;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}.
* The manager holds a reference to the latest {@link SegmentInfos} object that is used to refresh a reader.
*
* @opensearch.internal
*/
public class NRTReplicationReaderManager extends OpenSearchReaderManager {
private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class);
private volatile SegmentInfos currentInfos;
/**
* Creates and returns a new SegmentReplicationReaderManager from the given
* already-opened {@link OpenSearchDirectoryReader}, stealing
* the incoming reference.
*
* @param reader the SegmentReplicationReaderManager to use for future reopens
*/
NRTReplicationReaderManager(OpenSearchDirectoryReader reader) {
super(reader);
currentInfos = unwrapStandardReader(reader).getSegmentInfos();
}
@Override
protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
Objects.requireNonNull(referenceToRefresh);
final List<LeafReader> subs = new ArrayList<>();
final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh);
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
subs.add(ctx.reader());
}
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
innerReader,
Lucene.SOFT_DELETES_FIELD
);
logger.trace(
() -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
);
return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
}
/**
* Update this reader's segments and refresh.
*
* @param infos {@link SegmentInfos} infos
* @throws IOException - When Refresh fails with an IOException.
*/
public synchronized void updateSegments(SegmentInfos infos) throws IOException {
currentInfos = infos;
maybeRefresh();
}
public SegmentInfos getSegmentInfos() {
return currentInfos;
}
private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
final DirectoryReader delegate = reader.getDelegate();
if (delegate instanceof SoftDeletesDirectoryReaderWrapper) {
return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate();
}
return (StandardDirectoryReader) delegate;
}
}

View File

@ -270,6 +270,11 @@ public class ReadOnlyEngine extends Engine {
return lastCommittedSegmentInfos;
}
@Override
protected SegmentInfos getLatestSegmentInfos() {
return lastCommittedSegmentInfos;
}
@Override
public String getHistoryUUID() {
return lastCommittedSegmentInfos.userData.get(Engine.HISTORY_UUID_KEY);

View File

@ -156,7 +156,7 @@ public class LocalCheckpointTracker {
public synchronized void fastForwardProcessedSeqNo(final long seqNo) {
advanceMaxSeqNo(seqNo);
final long currentProcessedCheckpoint = processedCheckpoint.get();
if (shouldUpdateSeqNo(seqNo, currentProcessedCheckpoint, persistedCheckpoint) == false) {
if (seqNo <= currentProcessedCheckpoint) {
return;
}
processedCheckpoint.compareAndSet(currentProcessedCheckpoint, seqNo);

View File

@ -3160,7 +3160,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
globalCheckpointSupplier,
replicationTracker::getRetentionLeases,
() -> getOperationPrimaryTerm(),
tombstoneDocSupplier()
tombstoneDocSupplier(),
indexSettings.isSegRepEnabled() && shardRouting.primary() == false
);
}

View File

@ -109,6 +109,7 @@ import org.opensearch.index.engine.EngineConfig;
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.engine.NoOpEngine;
import org.opensearch.index.fielddata.IndexFieldDataCache;
import org.opensearch.index.flush.FlushStats;
@ -764,6 +765,9 @@ public class IndicesService extends AbstractLifecycleComponent
.filter(maybe -> Objects.requireNonNull(maybe).isPresent())
.collect(Collectors.toList());
if (engineFactories.isEmpty()) {
if (idxSettings.isSegRepEnabled()) {
return new NRTReplicationEngineFactory();
}
return new InternalEngineFactory();
} else if (engineFactories.size() == 1) {
assert engineFactories.get(0).isPresent();

View File

@ -65,7 +65,8 @@ public class EngineConfigFactoryTests extends OpenSearchTestCase {
null,
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null
null,
false
);
assertNotNull(config.getCodec());
@ -141,7 +142,8 @@ public class EngineConfigFactoryTests extends OpenSearchTestCase {
null,
() -> new RetentionLeases(0, 0, Collections.emptyList()),
null,
null
null,
false
);
assertNotNull(config.getCodec());
}

View File

@ -0,0 +1,239 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.index.engine;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.hamcrest.MatcherAssert;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.TestTranslog;
import org.opensearch.index.translog.Translog;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
public class NRTReplicationEngineTests extends EngineTestCase {
public void testCreateEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos();
assertEquals(latestSegmentInfos.version, lastCommittedSegmentInfos.version);
assertEquals(latestSegmentInfos.getGeneration(), lastCommittedSegmentInfos.getGeneration());
assertEquals(latestSegmentInfos.getUserData(), lastCommittedSegmentInfos.getUserData());
assertEquals(latestSegmentInfos.files(true), lastCommittedSegmentInfos.files(true));
assertTrue(nrtEngine.segments(true).isEmpty());
try (final GatedCloseable<IndexCommit> indexCommitGatedCloseable = nrtEngine.acquireLastIndexCommit(false)) {
final IndexCommit indexCommit = indexCommitGatedCloseable.get();
assertEquals(indexCommit.getUserData(), lastCommittedSegmentInfos.getUserData());
assertTrue(indexCommit.getFileNames().containsAll(lastCommittedSegmentInfos.files(true)));
}
}
}
public void testEngineWritesOpsToTranslog() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 500),
randomBoolean(),
randomBoolean(),
randomBoolean()
);
for (Engine.Operation op : operations) {
applyOperation(engine, op);
applyOperation(nrtEngine, op);
}
assertEquals(nrtEngine.getTranslogLastWriteLocation(), engine.getTranslogLastWriteLocation());
assertEquals(nrtEngine.getLastSyncedGlobalCheckpoint(), engine.getLastSyncedGlobalCheckpoint());
// we don't index into nrtEngine, so get the doc ids from the regular engine.
final List<DocIdSeqNoAndSource> docs = getDocIds(engine, true);
// recover a new engine from the nrtEngine's xlog.
nrtEngine.syncTranslog();
try (InternalEngine engine = new InternalEngine(nrtEngine.config())) {
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertEquals(getDocIds(engine, true), docs);
}
assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog());
}
}
public void testUpdateSegments() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
// add docs to the primary engine.
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean())
.stream()
.filter(op -> op.operationType().equals(Engine.Operation.TYPE.INDEX))
.collect(Collectors.toList());
for (Engine.Operation op : operations) {
applyOperation(engine, op);
applyOperation(nrtEngine, op);
}
engine.refresh("test");
nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint());
assertMatchingSegmentsAndCheckpoints(nrtEngine);
// assert a doc from the operations exists.
final ParsedDocument parsedDoc = createParsedDoc(operations.stream().findFirst().get().id(), null);
try (Engine.GetResult getResult = engine.get(newGet(true, parsedDoc), engine::acquireSearcher)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}
try (Engine.GetResult getResult = nrtEngine.get(newGet(true, parsedDoc), nrtEngine::acquireSearcher)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}
// Flush the primary and update the NRTEngine with the latest committed infos.
engine.flush();
nrtEngine.syncTranslog(); // to advance persisted checkpoint
Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}
nrtEngine.updateSegments(engine.getLastCommittedSegmentInfos(), engine.getProcessedLocalCheckpoint());
assertMatchingSegmentsAndCheckpoints(nrtEngine);
assertEquals(
nrtEngine.getTranslog().getGeneration().translogFileGeneration,
engine.getTranslog().getGeneration().translogFileGeneration
);
try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}
// Ensure the same hit count between engines.
int expectedDocCount;
try (final Engine.Searcher test = engine.acquireSearcher("test")) {
expectedDocCount = test.count(Queries.newMatchAllQuery());
assertSearcherHits(nrtEngine, expectedDocCount);
}
assertEngineCleanedUp(nrtEngine, nrtEngine.getTranslog());
}
}
public void testTrimTranslogOps() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (
final Store nrtEngineStore = createStore();
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore);
) {
List<Engine.Operation> operations = generateHistoryOnReplica(
between(1, 100),
randomBoolean(),
randomBoolean(),
randomBoolean()
);
applyOperations(nrtEngine, operations);
Set<Long> seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet());
try (Translog.Snapshot snapshot = nrtEngine.getTranslog().newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(operations.size()));
assertThat(
TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()),
equalTo(seqNos)
);
}
nrtEngine.rollTranslogGeneration();
nrtEngine.trimOperationsFromTranslog(primaryTerm.get(), NO_OPS_PERFORMED);
try (Translog.Snapshot snapshot = getTranslog(engine).newSnapshot()) {
assertThat(snapshot.totalOperations(), equalTo(0));
assertNull(snapshot.next());
}
}
}
private void assertMatchingSegmentsAndCheckpoints(NRTReplicationEngine nrtEngine) throws IOException {
assertEquals(engine.getPersistedLocalCheckpoint(), nrtEngine.getPersistedLocalCheckpoint());
assertEquals(engine.getProcessedLocalCheckpoint(), nrtEngine.getProcessedLocalCheckpoint());
assertEquals(engine.getLocalCheckpointTracker().getMaxSeqNo(), nrtEngine.getLocalCheckpointTracker().getMaxSeqNo());
assertEquals(engine.getLatestSegmentInfos().files(true), nrtEngine.getLatestSegmentInfos().files(true));
assertEquals(engine.getLatestSegmentInfos().getUserData(), nrtEngine.getLatestSegmentInfos().getUserData());
assertEquals(engine.getLatestSegmentInfos().getVersion(), nrtEngine.getLatestSegmentInfos().getVersion());
assertEquals(engine.segments(true), nrtEngine.segments(true));
}
private void assertSearcherHits(Engine engine, int hits) {
try (final Engine.Searcher test = engine.acquireSearcher("test")) {
MatcherAssert.assertThat(test, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(hits));
}
}
private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException {
Lucene.cleanLuceneIndex(store.directory());
final Path translogDir = createTempDir();
final EngineConfig replicaConfig = config(
defaultSettings,
store,
translogDir,
NoMergePolicy.INSTANCE,
null,
null,
globalCheckpoint::get
);
if (Lucene.indexExists(store.directory()) == false) {
store.createEmpty(replicaConfig.getIndexSettings().getIndexVersionCreated().luceneVersion);
final String translogUuid = Translog.createEmptyTranslog(
replicaConfig.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
primaryTerm.get()
);
store.associateIndexWithNewTranslog(translogUuid);
}
return new NRTReplicationEngine(replicaConfig);
}
}

View File

@ -332,59 +332,22 @@ public class LocalCheckpointTrackerTests extends OpenSearchTestCase {
assertThat(tracker.hasProcessed(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo)));
}
public void testFastForwardProcessedNoPersistentUpdate() {
public void testFastForwardProcessedSeqNo() {
// base case with no persistent checkpoint update
long seqNo1;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
}
public void testFastForwardProcessedPersistentUpdate() {
// base case with persistent checkpoint update
long seqNo1;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
tracker.markSeqNoAsPersisted(seqNo1);
assertThat(tracker.getPersistedCheckpoint(), equalTo(0L));
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.hasProcessed(0L), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));
assertThat(tracker.getProcessedCheckpoint(), equalTo(seqNo1));
// idempotent case
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.hasProcessed(0L), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false));
}
public void testFastForwardProcessedPersistentUpdate2() {
long seqNo1, seqNo2;
assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
seqNo1 = tracker.generateSeqNo();
seqNo2 = tracker.generateSeqNo();
assertThat(seqNo1, equalTo(0L));
assertThat(seqNo2, equalTo(1L));
tracker.markSeqNoAsPersisted(seqNo1);
tracker.markSeqNoAsPersisted(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L));
assertThat(tracker.getPersistedCheckpoint(), equalTo(1L));
tracker.fastForwardProcessedSeqNo(seqNo2);
assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
assertThat(tracker.hasProcessed(seqNo1), equalTo(true));
assertThat(tracker.hasProcessed(seqNo2), equalTo(true));
tracker.fastForwardProcessedSeqNo(seqNo1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(1L));
assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true));
assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false));
assertThat(tracker.getMaxSeqNo(), equalTo(1L));
tracker.fastForwardProcessedSeqNo(-1);
assertThat(tracker.getProcessedCheckpoint(), equalTo(0L));
assertThat(tracker.hasProcessed(0L), equalTo(true));
}
}

View File

@ -101,6 +101,8 @@ import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineTestCase;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.fielddata.FieldDataStats;
import org.opensearch.index.fielddata.IndexFieldData;
@ -136,6 +138,7 @@ import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.repositories.IndexId;
import org.opensearch.snapshots.Snapshot;
import org.opensearch.snapshots.SnapshotId;
@ -4167,14 +4170,14 @@ public class IndexShardTests extends IndexShardTestCase {
@Override
public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo)
throws IOException {
InternalEngine internalEngine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
InternalEngine engine = super.recoverFromTranslog(translogRecoveryRunner, recoverUpToSeqNo);
readyToSnapshotLatch.countDown();
try {
snapshotDoneLatch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return internalEngine;
return engine;
}
});
@ -4447,6 +4450,27 @@ public class IndexShardTests extends IndexShardTestCase {
closeShards(readonlyShard);
}
public void testReadOnlyReplicaEngineConfig() throws IOException {
Settings primarySettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.build();
final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory());
assertFalse(primaryShard.getEngine().config().isReadOnlyReplica());
assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class);
Settings replicaSettings = Settings.builder()
.put(primarySettings)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final IndexShard replicaShard = newStartedShard(false, replicaSettings, new NRTReplicationEngineFactory());
assertTrue(replicaShard.getEngine().config().isReadOnlyReplica());
assertEquals(replicaShard.getEngine().getClass(), NRTReplicationEngine.class);
closeShards(primaryShard, replicaShard);
}
public void testCloseShardWhileEngineIsWarming() throws Exception {
CountDownLatch warmerStarted = new CountDownLatch(1);
CountDownLatch warmerBlocking = new CountDownLatch(1);

View File

@ -328,24 +328,26 @@ public abstract class EngineTestCase extends OpenSearchTestCase {
super.tearDown();
try {
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
assertNoInFlightDocuments(engine);
assertMaxSeqNoInCommitUserData(engine);
assertAtMostOneLuceneDocumentPerSequenceNumber(engine);
assertEngineCleanedUp(engine, engine.getTranslog());
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine);
assertNoInFlightDocuments(replicaEngine);
assertMaxSeqNoInCommitUserData(replicaEngine);
assertAtMostOneLuceneDocumentPerSequenceNumber(replicaEngine);
assertEngineCleanedUp(replicaEngine, replicaEngine.getTranslog());
}
} finally {
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
}
}
protected void assertEngineCleanedUp(Engine engine, Translog translog) throws Exception {
if (engine.isClosed.get() == false) {
translog.getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine);
assertNoInFlightDocuments(engine);
assertMaxSeqNoInCommitUserData(engine);
assertAtMostOneLuceneDocumentPerSequenceNumber(engine);
}
}
protected static ParseContext.Document testDocumentWithTextField() {
return testDocumentWithTextField("test");
}