diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 5e5b2ed3fec..0bed51e0e24 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -128,6 +128,7 @@ public class InternalEngine extends Engine { private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final EngineConfig.OpenMode openMode; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); + private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); @@ -178,6 +179,7 @@ public class InternalEngine extends Engine { } logger.trace("recovered [{}]", seqNoStats); seqNoService = sequenceNumberService(shardId, engineConfig.getIndexSettings(), seqNoStats); + updateMaxUnsafeAutoIdTimestampFromWriter(writer); // norelease /* * We have no guarantees that all operations above the local checkpoint are in the Lucene commit or the translog. This means @@ -226,6 +228,17 @@ public class InternalEngine extends Engine { logger.trace("created new InternalEngine"); } + private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { + long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE; + for (Map.Entry entry : writer.getLiveCommitData()) { + if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) { + commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue()); + break; + } + } + maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); + } + private static SequenceNumbersService sequenceNumberService( final ShardId shardId, final IndexSettings indexSettings, @@ -500,7 +513,7 @@ public class InternalEngine extends Engine { return true; case LOCAL_TRANSLOG_RECOVERY: assert index.isRetry(); - return false; // even if retry is set we never optimize local recovery + return true; // allow to optimize in order to update the max safe time stamp default: throw new IllegalArgumentException("unknown origin " + index.origin()); } @@ -1770,6 +1783,7 @@ public class InternalEngine extends Engine { commitData.put(Engine.SYNC_COMMIT_ID, syncId); } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); + commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3f9965c0662..71d754ddfb6 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3223,11 +3223,40 @@ public class InternalEngineTests extends ESTestCase { } - long maxTimestamp = Math.abs(randomLong()); - try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - maxTimestamp, null))) { - assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + final long timestamp1 = Math.abs(randomLong()); + final Path storeDir = createTempDir(); + final Path translogDir = createTempDir(); + try (Store store = createStore(newFSDirectory(storeDir)); + Engine engine = new InternalEngine( + config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp1, null))) { + assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + } + final long timestamp2 = randomNonNegativeLong(); + final long timestamp3 = randomNonNegativeLong(); + final long maxTimestamp12 = Math.max(timestamp1, timestamp2); + final long maxTimestamp123 = Math.max(maxTimestamp12, timestamp3); + try (Store store = createStore(newFSDirectory(storeDir)); + Engine engine = new InternalEngine( + copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp2, null), + randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) { + assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + // recover from translog and commit maxTimestamp12 + engine.recoverFromTranslog(); + // force flush as the were no ops performed + engine.flush(true, false); + } + final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), + new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + engine.index(appendOnlyPrimary(doc, true, timestamp3)); + assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + } + try (Store store = createStore(newFSDirectory(storeDir)); + Engine engine = new InternalEngine( + config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.recoverFromTranslog(); + assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } }