Initialize max unsafe auto ID timestamp on shrink

When shrinking an index we initialize its max unsafe auto ID timestamp
to the maximum of the max unsafe auto ID timestamps on the source
shards.

Relates #25356
This commit is contained in:
Jason Tedor 2017-06-22 11:14:25 -04:00 committed by GitHub
parent d963882053
commit 8dcb1f5c7c
4 changed files with 29 additions and 9 deletions

View File

@ -28,6 +28,7 @@ import org.apache.lucene.store.NoLockFactory;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.store.Store;
import java.io.Closeable;
@ -64,6 +65,10 @@ final class LocalShardSnapshot implements Closeable {
return shard.getEngine().seqNoService().getMaxSeqNo();
}
long maxUnsafeAutoIdTimestamp() {
return Long.parseLong(shard.getEngine().commitStats().getUserData().get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID));
}
Directory getSnapshotDirectory() {
/* this directory will not be used for anything else but reading / copying files to another directory
* we prevent all write operations on this directory with UOE - nobody should close it either. */

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
@ -50,7 +51,6 @@ import org.elasticsearch.repositories.Repository;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
@ -120,7 +120,9 @@ final class StoreRecovery {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new);
final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo);
final long maxUnsafeAutoIdTimestamp =
shards.stream().mapToLong(LocalShardSnapshot::maxUnsafeAutoIdTimestamp).max().getAsLong();
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo, maxUnsafeAutoIdTimestamp);
internalRecoverFromStore(indexShard);
// just trigger a merge to do housekeeping on the
// copied segments - we will also see them in stats etc.
@ -139,7 +141,8 @@ final class StoreRecovery {
final Directory target,
final Sort indexSort,
final Directory[] sources,
final long maxSeqNo) throws IOException {
final long maxSeqNo,
final long maxUnsafeAutoIdTimestamp) throws IOException {
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
@ -162,6 +165,7 @@ final class StoreRecovery {
final HashMap<String, String> liveCommitData = new HashMap<>(2);
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp));
return liveCommitData.entrySet().iterator();
});
writer.commit();

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.index.IndexRequest;
@ -46,6 +47,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.IndexShard;
@ -233,8 +235,8 @@ public class ShrinkIndexIT extends ESIntegTestCase {
client().prepareIndex("source", "type")
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
}
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
.getDataNodes();
ImmutableOpenMap<String, DiscoveryNode> dataNodes =
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
String mergeNode = discoveryNodes[0].getName();
@ -249,9 +251,16 @@ public class ShrinkIndexIT extends ESIntegTestCase {
.put("index.blocks.write", true)).get();
ensureGreen();
final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").get();
final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").setSegments(true).get();
final long maxSeqNo =
Arrays.stream(sourceStats.getShards()).map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong();
final long maxUnsafeAutoIdTimestamp =
Arrays.stream(sourceStats.getShards())
.map(ShardStats::getStats)
.map(CommonStats::getSegments)
.mapToLong(SegmentsStats::getMaxUnsafeAutoIdTimestamp)
.max()
.getAsLong();
// now merge source into a single shard index
final boolean createWithReplicas = randomBoolean();
@ -264,6 +273,7 @@ public class ShrinkIndexIT extends ESIntegTestCase {
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
assertThat(seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo));
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(shardStats.getStats().getSegments().getMaxUnsafeAutoIdTimestamp(), equalTo(maxUnsafeAutoIdTimestamp));
}
final int size = docs > 0 ? 2 * docs : 1;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
@ -32,11 +31,11 @@ import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ESTestCase;
@ -87,7 +86,8 @@ public class StoreRecoveryTests extends ESTestCase {
RecoveryState.Index indexStats = new RecoveryState.Index();
Directory target = newFSDirectory(createTempDir());
final long maxSeqNo = randomNonNegativeLong();
storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo);
final long maxUnsafeAutoIdTimestamp = randomNonNegativeLong();
storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo, maxUnsafeAutoIdTimestamp);
int numFiles = 0;
Predicate<String> filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false
&& f.startsWith("extra") == false;
@ -107,6 +107,7 @@ public class StoreRecoveryTests extends ESTestCase {
final Map<String, String> userData = segmentCommitInfos.getUserData();
assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(maxSeqNo)));
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(maxSeqNo)));
assertThat(userData.get(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID), equalTo(Long.toString(maxUnsafeAutoIdTimestamp)));
for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge
assertEquals("all sources must be flush",
info.info.getDiagnostics().get("source"), "flush");