diff --git a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java index ebf08874c04..f108300b95b 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java +++ b/core/src/main/java/org/elasticsearch/index/shard/LocalShardSnapshot.java @@ -28,6 +28,7 @@ import org.apache.lucene.store.NoLockFactory; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.store.Store; import java.io.Closeable; @@ -64,6 +65,10 @@ final class LocalShardSnapshot implements Closeable { return shard.getEngine().seqNoService().getMaxSeqNo(); } + long maxUnsafeAutoIdTimestamp() { + return Long.parseLong(shard.getEngine().commitStats().getUserData().get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)); + } + Directory getSnapshotDirectory() { /* this directory will not be used for anything else but reading / copying files to another directory * we prevent all write operations on this directory with UOE - nobody should close it either. */ diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 76f35952257..078e8b06d6e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; @@ -50,7 +51,6 @@ import org.elasticsearch.repositories.Repository; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Set; @@ -120,7 +120,9 @@ final class StoreRecovery { final Directory directory = indexShard.store().directory(); // don't close this directory!! final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new); final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong(); - addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo); + final long maxUnsafeAutoIdTimestamp = + shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong(); + addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp); internalRecoverFromStore(indexShard); // just trigger a merge to do housekeeping on the // copied segments - we will also see them in stats etc. @@ -139,7 +141,8 @@ final class StoreRecovery { final Directory target, final Sort indexSort, final Directory[] sources, - final long maxSeqNo) throws IOException { + final long maxSeqNo, + final long maxUnsafeAutoIdTimestamp) throws IOException { final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target); IndexWriterConfig iwc = new IndexWriterConfig(null) .setCommitOnClose(false) @@ -162,6 +165,7 @@ final class StoreRecovery { final HashMap liveCommitData = new HashMap<>(2); liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); + liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp)); return liveCommitData.entrySet().iterator(); }); writer.commit(); diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java index e79b30ae64f..a5c4cbb8a28 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequest; @@ -46,6 +47,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.query.TermsQueryBuilder; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.shard.IndexShard; @@ -233,8 +235,8 @@ public class ShrinkIndexIT extends ESIntegTestCase { client().prepareIndex("source", "type") .setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get(); } - ImmutableOpenMap dataNodes = client().admin().cluster().prepareState().get().getState().nodes() - .getDataNodes(); + ImmutableOpenMap dataNodes = + client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2); DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); String mergeNode = discoveryNodes[0].getName(); @@ -249,9 +251,16 @@ public class ShrinkIndexIT extends ESIntegTestCase { .put("index.blocks.write", true)).get(); ensureGreen(); - final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").get(); + final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get(); final long maxSeqNo = Arrays.stream(sourceStats.getShards()).map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong(); + final long maxUnsafeAutoIdTimestamp = + Arrays.stream(sourceStats.getShards()) + .map(ShardStats::getStats) + .map(CommonStats::getSegments) + .mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp) + .max() + .getAsLong(); // now merge source into a single shard index final boolean createWithReplicas = randomBoolean(); @@ -264,6 +273,7 @@ public class ShrinkIndexIT extends ESIntegTestCase { final SeqNoStats seqNoStats = shardStats.getSeqNoStats(); assertThat(seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo)); assertThat(seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo)); + assertThat(shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), equalTo(maxUnsafeAutoIdTimestamp)); } final int size = docs > 0 ? 2 * docs : 1; diff --git a/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java index 985479b1ad6..8d3ac8433d1 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/StoreRecoveryTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.index.shard; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.document.Field; import org.apache.lucene.document.SortedNumericDocValuesField; -import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexWriter; @@ -32,11 +31,11 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedNumericSortField; -import org.apache.lucene.search.SortedSetSortField; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.test.ESTestCase; @@ -87,7 +86,8 @@ public class StoreRecoveryTests extends ESTestCase { RecoveryState.Index indexStats = new RecoveryState.Index(); Directory target = newFSDirectory(createTempDir()); final long maxSeqNo = randomNonNegativeLong(); - storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo); + final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong(); + storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo, maxUnsafeAutoIdTimestamp); int numFiles = 0; Predicate filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false && f.startsWith("extra") == false; @@ -107,6 +107,7 @@ public class StoreRecoveryTests extends ESTestCase { final Map userData = segmentCommitInfos.getUserData(); assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(maxSeqNo))); assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(maxSeqNo))); + assertThat(userData.get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID), equalTo(Long.toString(maxUnsafeAutoIdTimestamp))); for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge assertEquals("all sources must be flush", info.info.getDiagnostics().get("source"), "flush");