Peer Recovery: remove maxUnsafeAutoIdTimestamp hand off (#24243)

With #24149 , it is now stored in the Lucene commit and is implicitly transferred in the file phase of the recovery.
This commit is contained in:
Boaz Leskes 2017-04-21 17:31:50 +02:00 committed by GitHub
parent 63e5aff5d6
commit badb2be066
15 changed files with 108 additions and 123 deletions

View File

@ -27,7 +27,6 @@ import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
@ -67,7 +66,6 @@ public final class EngineConfig {
private final Engine.EventListener eventListener;
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
private final long maxUnsafeAutoIdTimestamp;
@Nullable
private final ReferenceManager.RefreshListener refreshListeners;
@Nullable
@ -116,7 +114,7 @@ public final class EngineConfig {
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners,
long maxUnsafeAutoIdTimestamp, Sort indexSort) {
Sort indexSort) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
@ -143,9 +141,6 @@ public final class EngineConfig {
this.flushMergesAfter = flushMergesAfter;
this.openMode = openMode;
this.refreshListeners = refreshListeners;
assert maxUnsafeAutoIdTimestamp >= IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP :
"maxUnsafeAutoIdTimestamp must be >= -1 but was " + maxUnsafeAutoIdTimestamp;
this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp;
this.indexSort = indexSort;
}
@ -333,11 +328,10 @@ public final class EngineConfig {
}
/**
* Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
* This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
* returns true if the engine is allowed to optimize indexing operations with an auto-generated ID
*/
public long getMaxUnsafeAutoIdTimestamp() {
return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS) ? maxUnsafeAutoIdTimestamp : Long.MAX_VALUE;
public boolean isAutoGeneratedIDsOptimizationEnabled() {
return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS);
}
/**

View File

@ -128,7 +128,7 @@ public class InternalEngine extends Engine {
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
@ -136,11 +136,8 @@ public class InternalEngine extends Engine {
public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_5_0_0_beta1)) {
// no optimization for pre 5.0.0.alpha6 since translog might not have all information needed
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
} else {
maxUnsafeAutoIdTimestamp.set(engineConfig.getMaxUnsafeAutoIdTimestamp());
}
this.versionMap = new LiveVersionMap();
store.incRef();
@ -1836,7 +1833,7 @@ public class InternalEngine extends Engine {
mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletedTombstones();
if (engineConfig.getMaxUnsafeAutoIdTimestamp() == Long.MAX_VALUE) {
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
// this is an anti-viral settings you can only opt out for the entire index
// only if a shard starts up again due to relocation or if the index is closed
// the setting will be re-interpreted if it's set to true

View File

@ -27,6 +27,7 @@ import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
@ -38,11 +39,11 @@ import org.apache.lucene.store.Lock;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -79,6 +80,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.Segment;
@ -1040,11 +1042,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
translogStats.totalOperations(0);
translogStats.totalOperationsOnStart(0);
}
internalPerformTranslogRecovery(false, indexExists, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
internalPerformTranslogRecovery(false, indexExists);
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
@ -1073,7 +1075,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);
assert indexExists == false || assertMaxUnsafeAutoIdInCommit();
final EngineConfig config = newEngineConfig(openMode);
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
@ -1087,6 +1092,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
private boolean assertMaxUnsafeAutoIdInCommit() throws IOException {
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.PEER) {
// as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point.
// This should have baked into the commit by the primary we recover from, regardless of the index age.
assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
"recovery from remote but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit";
} else if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE &&
indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
"opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID
+ " is not found in commit";
}
return true;
}
protected void onNewEngine(Engine newEngine) {
refreshListeners.setTranslog(newEngine.getTranslog());
}
@ -1096,9 +1117,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* the replay of the transaction log which is required in cases where we restore a previous index or recover from
* a remote peer.
*/
public void skipTranslogRecovery(long maxUnsafeAutoIdTimestamp) throws IOException {
public void skipTranslogRecovery() throws IOException {
assert getEngineOrNull() == null : "engine was already created";
internalPerformTranslogRecovery(true, true, maxUnsafeAutoIdTimestamp);
internalPerformTranslogRecovery(true, true);
assert recoveryState.getTranslog().recoveredOperations() == 0;
}
@ -1795,14 +1816,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return mapperService.documentMapperWithAutoCreate(type);
}
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, long maxUnsafeAutoIdTimestamp) {
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners,
maxUnsafeAutoIdTimestamp, indexSort);
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
}
/**

View File

@ -31,7 +31,6 @@ import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -353,7 +352,7 @@ final class StoreRecovery {
recoveryState.getIndex().updateVersion(version);
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
indexShard.skipTranslogRecovery();
} else {
// since we recover from local, just fill the files and size
try {
@ -405,7 +404,7 @@ final class StoreRecovery {
}
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
} catch (Exception e) {

View File

@ -377,7 +377,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(), request.getMaxUnsafeAutoIdTimestamp());
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -29,7 +30,6 @@ import java.io.IOException;
public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
private long maxUnsafeAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
private long recoveryId;
private ShardId shardId;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
@ -37,11 +37,10 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques
public RecoveryPrepareForTranslogOperationsRequest() {
}
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, long maxUnsafeAutoIdTimestamp) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp;
}
public long recoveryId() {
@ -56,17 +55,15 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques
return totalTranslogOps;
}
public long getMaxUnsafeAutoIdTimestamp() {
return maxUnsafeAutoIdTimestamp;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
maxUnsafeAutoIdTimestamp = in.readLong();
if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
}
@Override
@ -75,6 +72,8 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeVInt(totalTranslogOps);
out.writeLong(maxUnsafeAutoIdTimestamp);
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
}
}
}

View File

@ -157,7 +157,7 @@ public class RecoverySourceHandler {
}
try {
prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
prepareTargetForTranslog(translogView.totalOperations());
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
@ -389,13 +389,13 @@ public class RecoverySourceHandler {
}
}
void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp));
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
stopWatch.stop();
response.startTime = stopWatch.totalTime().millis() - startEngineStart;

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
@ -49,7 +48,6 @@ import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@ -58,8 +56,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -360,9 +356,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
/*** Implementation of {@link RecoveryTargetHandler } */
@Override
public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().skipTranslogRecovery(maxUnsafeAutoIdTimestamp);
indexShard().skipTranslogRecovery();
}
@Override

View File

@ -33,10 +33,8 @@ public interface RecoveryTargetHandler {
* Prepares the target to receive translog operations, after all file have been copied
*
* @param totalTranslogOps total translog operations expected to be sent
* @param maxUnsafeAutoIdTimestamp the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
* This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
*/
void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException;
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and

View File

@ -78,9 +78,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
}
@Override
public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, maxUnsafeAutoIdTimestamp),
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}

View File

@ -56,10 +56,10 @@ import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
@ -262,7 +262,7 @@ public class InternalEngineTests extends ESTestCase {
config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), analyzer, config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(),
config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
config.getMaxUnsafeAutoIdTimestamp(), config.getIndexSort());
config.getIndexSort());
}
@Override
@ -371,7 +371,7 @@ public class InternalEngineTests extends ESTestCase {
@Nullable IndexWriterFactory indexWriterFactory,
@Nullable Supplier<SequenceNumbersService> sequenceNumbersServiceSupplier,
@Nullable Sort indexSort) throws IOException {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null, indexSort);
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort);
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog();
@ -404,25 +404,22 @@ public class InternalEngineTests extends ESTestCase {
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(),
maxUnsafeAutoIdTimestamp, refreshListener, null);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener, Sort indexSort) {
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(),
maxUnsafeAutoIdTimestamp, refreshListener, indexSort);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp,
ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, deletionPolicy, maxUnsafeAutoIdTimestamp, refreshListener, null);
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, null);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp,
ReferenceManager.RefreshListener refreshListener, Sort indexSort) {
return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, indexSort);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
SnapshotDeletionPolicy deletionPolicy, ReferenceManager.RefreshListener refreshListener) {
return config(indexSettings, store, translogPath, mergePolicy, deletionPolicy, refreshListener, null);
}
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
SnapshotDeletionPolicy deletionPolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
@ -445,8 +442,7 @@ public class InternalEngineTests extends ESTestCase {
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
new TranslogHandler(xContentRegistry(), shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener,
maxUnsafeAutoIdTimestamp, indexSort);
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, indexSort);
return config;
}
@ -1170,8 +1166,7 @@ public class InternalEngineTests extends ESTestCase {
public void testSyncedFlush() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
@ -1198,7 +1193,7 @@ public class InternalEngineTests extends ESTestCase {
for (int i = 0; i < iters; i++) {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
new LogDocMergePolicy(), null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
Engine.Index doc1 = indexForDoc(testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null));
engine.index(doc1);
@ -1317,7 +1312,7 @@ public class InternalEngineTests extends ESTestCase {
public void testForceMerge() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { // use log MP here we test some behavior in ESMP
new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), "test", null, testDocument(), B_1, null);
@ -2132,8 +2127,7 @@ public class InternalEngineTests extends ESTestCase {
public void testConcurrentWritesAndCommits() throws Exception {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(),
new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), null))) {
final int numIndexingThreads = scaledRandomIntBetween(3, 6);
final int numDocsPerThread = randomIntBetween(500, 1000);
@ -2274,7 +2268,7 @@ public class InternalEngineTests extends ESTestCase {
public void testEnableGcDeletes() throws Exception {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) {
engine.config().setEnableGcDeletes(false);
// Add document
@ -2421,7 +2415,8 @@ public class InternalEngineTests extends ESTestCase {
// expected
}
// now it should be OK.
EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null),
EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
engine = new InternalEngine(config);
}
@ -2736,7 +2731,7 @@ public class InternalEngineTests extends ESTestCase {
config.getIndexSettings(), null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(),
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), config.getRefreshListeners(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null);
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
@ -2788,7 +2783,7 @@ public class InternalEngineTests extends ESTestCase {
public void testCurrentTranslogIDisCommitted() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null);
// create
{
@ -3284,47 +3279,36 @@ public class InternalEngineTests extends ESTestCase {
}
public void testEngineMaxTimestampIsInitialized() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
final long timestamp1 = Math.abs(randomLong());
final long timestamp1 = Math.abs(randomNonNegativeLong());
final Path storeDir = createTempDir();
final Path translogDir = createTempDir();
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp1, null))) {
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
final long timestamp2 = randomNonNegativeLong();
final long timestamp3 = randomNonNegativeLong();
final long maxTimestamp12 = Math.max(timestamp1, timestamp2);
final long maxTimestamp123 = Math.max(maxTimestamp12, timestamp3);
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp2, null),
randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// recover from translog and commit maxTimestamp12
engine.recoverFromTranslog();
// force flush as the were no ops performed
engine.flush(true, false);
}
Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(appendOnlyPrimary(doc, true, timestamp3));
assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.index(appendOnlyPrimary(doc, true, timestamp1));
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.recoverFromTranslog();
assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
engine.index(appendOnlyPrimary(doc, true, timestamp2));
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.flush();
}
try (Store store = createStore(newFSDirectory(storeDir));
Engine engine = new InternalEngine(
config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null),
randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
engine.recoverFromTranslog();
assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
}
@ -3394,8 +3378,7 @@ public class InternalEngineTests extends ESTestCase {
CyclicBarrier join = new CyclicBarrier(2);
CountDownLatch start = new CountDownLatch(1);
AtomicInteger controller = new AtomicInteger(0);
EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, new ReferenceManager.RefreshListener() {
EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() throws IOException {
}

View File

@ -29,11 +29,11 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -289,9 +289,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC
return new RecoveryTarget(indexShard, node, recoveryListener, l -> {
}) {
@Override
public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
preparedForTranslog.set(true);
super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp);
super.prepareForTranslogOperations(totalTranslogOps);
}
};
});

View File

@ -1281,8 +1281,8 @@ public class IndexShardTests extends IndexShardTestCase {
new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> {
}) {
@Override
public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp);
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
super.prepareForTranslogOperations(totalTranslogOps);
// Shard is still inactive since we haven't started recovering yet
assertFalse(replica.isActive());

View File

@ -29,7 +29,6 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -123,7 +122,7 @@ public class RefreshListenersTests extends ESTestCase {
store, new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), newMergePolicy(), iwc.getAnalyzer(),
iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
TimeValue.timeValueMinutes(5), listeners, null);
engine = new InternalEngine(config);
listeners.setTranslog(engine.getTranslog());
}

View File

@ -393,7 +393,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
prepareTargetForTranslogCalled.set(true);
}
@ -483,7 +483,7 @@ public class RecoverySourceHandlerTests extends ESTestCase {
}
@Override
void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
prepareTargetForTranslogCalled.set(true);
}