From badb2be06682a14afbd898145368b0b30e151182 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 21 Apr 2017 17:31:50 +0200 Subject: [PATCH] Peer Recovery: remove maxUnsafeAutoIdTimestamp hand off (#24243) With #24149 , it is now stored in the Lucene commit and is implicitly transferred in the file phase of the recovery. --- .../index/engine/EngineConfig.java | 14 +-- .../index/engine/InternalEngine.java | 9 +- .../elasticsearch/index/shard/IndexShard.java | 38 +++++-- .../index/shard/StoreRecovery.java | 5 +- .../recovery/PeerRecoveryTargetService.java | 2 +- ...ryPrepareForTranslogOperationsRequest.java | 17 ++- .../recovery/RecoverySourceHandler.java | 6 +- .../indices/recovery/RecoveryTarget.java | 8 +- .../recovery/RecoveryTargetHandler.java | 4 +- .../recovery/RemoteRecoveryTargetHandler.java | 4 +- .../index/engine/InternalEngineTests.java | 107 ++++++++---------- .../RecoveryDuringReplicationTests.java | 6 +- .../index/shard/IndexShardTests.java | 4 +- .../index/shard/RefreshListenersTests.java | 3 +- .../recovery/RecoverySourceHandlerTests.java | 4 +- 15 files changed, 108 insertions(+), 123 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 7852d2c2db0..d22a93273c7 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -27,7 +27,6 @@ import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.similarities.Similarity; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -67,7 +66,6 @@ public final class EngineConfig { private final Engine.EventListener eventListener; private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; - private final long maxUnsafeAutoIdTimestamp; @Nullable private final ReferenceManager.RefreshListener refreshListeners; @Nullable @@ -116,7 +114,7 @@ public final class EngineConfig { Similarity similarity, CodecService codecService, Engine.EventListener eventListener, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners, - long maxUnsafeAutoIdTimestamp, Sort indexSort) { + Sort indexSort) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -143,9 +141,6 @@ public final class EngineConfig { this.flushMergesAfter = flushMergesAfter; this.openMode = openMode; this.refreshListeners = refreshListeners; - assert maxUnsafeAutoIdTimestamp >= IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : - "maxUnsafeAutoIdTimestamp must be >= -1 but was " + maxUnsafeAutoIdTimestamp; - this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp; this.indexSort = indexSort; } @@ -333,11 +328,10 @@ public final class EngineConfig { } /** - * Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine. - * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs + * returns true if the engine is allowed to optimize indexing operations with an auto-generated ID */ - public long getMaxUnsafeAutoIdTimestamp() { - return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS) ? maxUnsafeAutoIdTimestamp : Long.MAX_VALUE; + public boolean isAutoGeneratedIDsOptimizationEnabled() { + return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS); } /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3e5d3453cac..8e1eb059caa 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -128,7 +128,7 @@ public class InternalEngine extends Engine { private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final EngineConfig.OpenMode openMode; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); - private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; + public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); @@ -136,11 +136,8 @@ public class InternalEngine extends Engine { public InternalEngine(EngineConfig engineConfig) throws EngineException { super(engineConfig); openMode = engineConfig.getOpenMode(); - if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_5_0_0_beta1)) { - // no optimization for pre 5.0.0.alpha6 since translog might not have all information needed + if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); - } else { - maxUnsafeAutoIdTimestamp.set(engineConfig.getMaxUnsafeAutoIdTimestamp()); } this.versionMap = new LiveVersionMap(); store.incRef(); @@ -1836,7 +1833,7 @@ public class InternalEngine extends Engine { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletedTombstones(); - if (engineConfig.getMaxUnsafeAutoIdTimestamp() == Long.MAX_VALUE) { + if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { // this is an anti-viral settings you can only opt out for the entire index // only if a shard starts up again due to relocation or if the index is closed // the setting will be re-interpreted if it's set to true diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d1ca4f13a42..1da5e6763bc 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -27,6 +27,7 @@ import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.Query; @@ -38,11 +39,11 @@ import org.apache.lucene.store.Lock; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -79,6 +80,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.Segment; @@ -1040,11 +1042,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl translogStats.totalOperations(0); translogStats.totalOperationsOnStart(0); } - internalPerformTranslogRecovery(false, indexExists, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + internalPerformTranslogRecovery(false, indexExists); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } - private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException { + private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1073,7 +1075,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } else { openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; } - final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp); + + assert indexExists == false || assertMaxUnsafeAutoIdInCommit(); + + final EngineConfig config = newEngineConfig(openMode); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); @@ -1087,6 +1092,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + private boolean assertMaxUnsafeAutoIdInCommit() throws IOException { + final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); + if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.PEER) { + // as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point. + // This should have baked into the commit by the primary we recover from, regardless of the index age. + assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : + "recovery from remote but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit"; + } else if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE && + indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) { + assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : + "opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + + " is not found in commit"; + } + return true; + } + protected void onNewEngine(Engine newEngine) { refreshListeners.setTranslog(newEngine.getTranslog()); } @@ -1096,9 +1117,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * the replay of the transaction log which is required in cases where we restore a previous index or recover from * a remote peer. */ - public void skipTranslogRecovery(long maxUnsafeAutoIdTimestamp) throws IOException { + public void skipTranslogRecovery() throws IOException { assert getEngineOrNull() == null : "engine was already created"; - internalPerformTranslogRecovery(true, true, maxUnsafeAutoIdTimestamp); + internalPerformTranslogRecovery(true, true); assert recoveryState.getTranslog().recoveredOperations() == 0; } @@ -1795,14 +1816,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return mapperService.documentMapperWithAutoCreate(type); } - private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, long maxUnsafeAutoIdTimestamp) { + private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger); Sort indexSort = indexSortSupplier.get(); return new EngineConfig(openMode, shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, - IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, - maxUnsafeAutoIdTimestamp, indexSort); + IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 6cfaca8c45b..032c033f175 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -31,7 +31,6 @@ import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -353,7 +352,7 @@ final class StoreRecovery { recoveryState.getIndex().updateVersion(version); if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { assert indexShouldExists; - indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + indexShard.skipTranslogRecovery(); } else { // since we recover from local, just fill the files and size try { @@ -405,7 +404,7 @@ final class StoreRecovery { } final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); - indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + indexShard.skipTranslogRecovery(); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); } catch (Exception e) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index a93cdd51e38..f449f9ffbe4 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -377,7 +377,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(), request.getMaxUnsafeAutoIdTimestamp()); + recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java index 94425f62799..155aa53e71a 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -29,7 +30,6 @@ import java.io.IOException; public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { - private long maxUnsafeAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; private long recoveryId; private ShardId shardId; private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; @@ -37,11 +37,10 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques public RecoveryPrepareForTranslogOperationsRequest() { } - RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, long maxUnsafeAutoIdTimestamp) { + RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) { this.recoveryId = recoveryId; this.shardId = shardId; this.totalTranslogOps = totalTranslogOps; - this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp; } public long recoveryId() { @@ -56,17 +55,15 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques return totalTranslogOps; } - public long getMaxUnsafeAutoIdTimestamp() { - return maxUnsafeAutoIdTimestamp; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); totalTranslogOps = in.readVInt(); - maxUnsafeAutoIdTimestamp = in.readLong(); + if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { + in.readLong(); // maxUnsafeAutoIdTimestamp + } } @Override @@ -75,6 +72,8 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques out.writeLong(recoveryId); shardId.writeTo(out); out.writeVInt(totalTranslogOps); - out.writeLong(maxUnsafeAutoIdTimestamp); + if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp + } } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index a6aa47492e1..40f9f7f74f8 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -157,7 +157,7 @@ public class RecoverySourceHandler { } try { - prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp()); + prepareTargetForTranslog(translogView.totalOperations()); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -389,13 +389,13 @@ public class RecoverySourceHandler { } } - void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException { + void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase1]: prepare remote engine for translog"); final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp)); + cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps)); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis() - startEngineStart; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index d9886efa07b..b12006bbd3c 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; @@ -49,7 +48,6 @@ import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -58,8 +56,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -360,9 +356,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); - indexShard().skipTranslogRecovery(maxUnsafeAutoIdTimestamp); + indexShard().skipTranslogRecovery(); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 831181c6311..bdace02d218 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -33,10 +33,8 @@ public interface RecoveryTargetHandler { * Prepares the target to receive translog operations, after all file have been copied * * @param totalTranslogOps total translog operations expected to be sent - * @param maxUnsafeAutoIdTimestamp the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine. - * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs */ - void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException; + void prepareForTranslogOperations(int totalTranslogOps) throws IOException; /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 5fa1ca22c70..959522d297d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -78,9 +78,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, maxUnsafeAutoIdTimestamp), + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index fe9e75f304a..3792872c9e3 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -56,10 +56,10 @@ import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; @@ -262,7 +262,7 @@ public class InternalEngineTests extends ESTestCase { config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getMaxUnsafeAutoIdTimestamp(), config.getIndexSort()); + config.getIndexSort()); } @Override @@ -371,7 +371,7 @@ public class InternalEngineTests extends ESTestCase { @Nullable IndexWriterFactory indexWriterFactory, @Nullable Supplier sequenceNumbersServiceSupplier, @Nullable Sort indexSort) throws IOException { - EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null, indexSort); + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config); if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); @@ -404,25 +404,22 @@ public class InternalEngineTests extends ESTestCase { } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), - maxUnsafeAutoIdTimestamp, refreshListener, null); - } - - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener, Sort indexSort) { - return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), - maxUnsafeAutoIdTimestamp, refreshListener, indexSort); - } - - public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, deletionPolicy, maxUnsafeAutoIdTimestamp, refreshListener, null); + return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, null); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp, + ReferenceManager.RefreshListener refreshListener, Sort indexSort) { + return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, indexSort); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + SnapshotDeletionPolicy deletionPolicy, ReferenceManager.RefreshListener refreshListener) { + return config(indexSettings, store, translogPath, mergePolicy, deletionPolicy, refreshListener, null); + } + + public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, + SnapshotDeletionPolicy deletionPolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); @@ -445,8 +442,7 @@ public class InternalEngineTests extends ESTestCase { EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, new TranslogHandler(xContentRegistry(), shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, - maxUnsafeAutoIdTimestamp, indexSort); + IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, indexSort); return config; } @@ -1170,8 +1166,7 @@ public class InternalEngineTests extends ESTestCase { public void testSyncedFlush() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); @@ -1198,7 +1193,7 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < iters; i++) { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + new LogDocMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); Engine.Index doc1 = indexForDoc(testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null)); engine.index(doc1); @@ -1317,7 +1312,7 @@ public class InternalEngineTests extends ESTestCase { public void testForceMerge() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { // use log MP here we test some behavior in ESMP + new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), "test", null, testDocument(), B_1, null); @@ -2132,8 +2127,7 @@ public class InternalEngineTests extends ESTestCase { public void testConcurrentWritesAndCommits() throws Exception { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), - new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), null))) { final int numIndexingThreads = scaledRandomIntBetween(3, 6); final int numDocsPerThread = randomIntBetween(500, 1000); @@ -2274,7 +2268,7 @@ public class InternalEngineTests extends ESTestCase { public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); // Add document @@ -2421,7 +2415,8 @@ public class InternalEngineTests extends ESTestCase { // expected } // now it should be OK. - EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); + EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null), + EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); engine = new InternalEngine(config); } @@ -2736,7 +2731,7 @@ public class InternalEngineTests extends ESTestCase { config.getIndexSettings(), null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), config.getRefreshListeners(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); + TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -2788,7 +2783,7 @@ public class InternalEngineTests extends ESTestCase { public void testCurrentTranslogIDisCommitted() throws IOException { try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null); // create { @@ -3284,47 +3279,36 @@ public class InternalEngineTests extends ESTestCase { } public void testEngineMaxTimestampIsInitialized() throws IOException { - try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { - assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - } - - final long timestamp1 = Math.abs(randomLong()); + final long timestamp1 = Math.abs(randomNonNegativeLong()); final Path storeDir = createTempDir(); final Path translogDir = createTempDir(); - try (Store store = createStore(newFSDirectory(storeDir)); - Engine engine = new InternalEngine( - config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp1, null))) { - assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - } final long timestamp2 = randomNonNegativeLong(); - final long timestamp3 = randomNonNegativeLong(); final long maxTimestamp12 = Math.max(timestamp1, timestamp2); - final long maxTimestamp123 = Math.max(maxTimestamp12, timestamp3); try (Store store = createStore(newFSDirectory(storeDir)); - Engine engine = new InternalEngine( - copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp2, null), - randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) { - assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - // recover from translog and commit maxTimestamp12 - engine.recoverFromTranslog(); - // force flush as the were no ops performed - engine.flush(true, false); - } + Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) { + assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - engine.index(appendOnlyPrimary(doc, true, timestamp3)); - assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.index(appendOnlyPrimary(doc, true, timestamp1)); + assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + } + try (Store store = createStore(newFSDirectory(storeDir)); + Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) { + assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.recoverFromTranslog(); + assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), + new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + engine.index(appendOnlyPrimary(doc, true, timestamp2)); + assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.flush(); } try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine( - config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null), + randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) { assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - engine.recoverFromTranslog(); - assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } } @@ -3394,8 +3378,7 @@ public class InternalEngineTests extends ESTestCase { CyclicBarrier join = new CyclicBarrier(2); CountDownLatch start = new CountDownLatch(1); AtomicInteger controller = new AtomicInteger(0); - EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, new ReferenceManager.RefreshListener() { + EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), new ReferenceManager.RefreshListener() { @Override public void beforeRefresh() throws IOException { } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 139c7f500d8..349258785f0 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -29,11 +29,11 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -289,9 +289,9 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC return new RecoveryTarget(indexShard, node, recoveryListener, l -> { }) { @Override - public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { preparedForTranslog.set(true); - super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp); + super.prepareForTranslogOperations(totalTranslogOps); } }; }); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 629a8af3e0d..eccd958c36e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1281,8 +1281,8 @@ public class IndexShardTests extends IndexShardTestCase { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { - super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp); + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(totalTranslogOps); // Shard is still inactive since we haven't started recovering yet assertFalse(replica.isActive()); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 3e5a34c3921..aa3b9b1ee85 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -29,7 +29,6 @@ import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; import org.apache.lucene.util.IOUtils; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -123,7 +122,7 @@ public class RefreshListenersTests extends ESTestCase { store, new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); + TimeValue.timeValueMinutes(5), listeners, null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 40a92b11e73..e424eb39932 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -393,7 +393,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { } @Override - void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException { + void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { prepareTargetForTranslogCalled.set(true); } @@ -483,7 +483,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { } @Override - void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException { + void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { prepareTargetForTranslogCalled.set(true); }