Initialize sequence numbers on a shrunken index
Bringing together shards in a shrunken index means that we need to address the start of history for the shrunken index. The problem here is that sequence numbers before the maximum of the maximum sequence numbers on the source shards can collide in the target shards in the shrunken index. To address this, we set the maximum sequence number and the local checkpoint on the target shards to this maximum of the maximum sequence numbers. This enables correct document-level semantics for documents indexed before the shrink, and history on the shrunken index will effectively start from here. Relates #25321
This commit is contained in:
parent
4bbb7e828b
commit
cc67d027de
|
@ -1325,7 +1325,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, ToXContent {
|
|||
* @param sourceIndexMetadata the metadata of the source index
|
||||
* @param targetNumberOfShards the total number of shards in the target index
|
||||
* @return the routing factor for and shrunk index with the given number of target shards.
|
||||
* @throws IllegalArgumentException if the number of source shards is greater than the number of target shards or if the source shards
|
||||
* @throws IllegalArgumentException if the number of source shards is less than the number of target shards or if the source shards
|
||||
* are not divisible by the number of target shards.
|
||||
*/
|
||||
public static int getRoutingFactor(IndexMetaData sourceIndexMetadata, int targetNumberOfShards) {
|
||||
|
|
|
@ -60,6 +60,10 @@ final class LocalShardSnapshot implements Closeable {
|
|||
return shard.indexSettings().getIndex();
|
||||
}
|
||||
|
||||
long maxSeqNo() {
|
||||
return shard.getEngine().seqNoService().getMaxSeqNo();
|
||||
}
|
||||
|
||||
Directory getSnapshotDirectory() {
|
||||
/* this directory will not be used for anything else but reading / copying files to another directory
|
||||
* we prevent all write operations on this directory with UOE - nobody should close it either. */
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.engine.EngineException;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
|
@ -49,6 +50,8 @@ import org.elasticsearch.repositories.Repository;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -115,9 +118,9 @@ final class StoreRecovery {
|
|||
logger.debug("starting recovery from local shards {}", shards);
|
||||
try {
|
||||
final Directory directory = indexShard.store().directory(); // don't close this directory!!
|
||||
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort,
|
||||
shards.stream().map(s -> s.getSnapshotDirectory())
|
||||
.collect(Collectors.toList()).toArray(new Directory[shards.size()]));
|
||||
final Directory[] sources = shards.stream().map(LocalShardSnapshot::getSnapshotDirectory).toArray(Directory[]::new);
|
||||
final long maxSeqNo = shards.stream().mapToLong(LocalShardSnapshot::maxSeqNo).max().getAsLong();
|
||||
addIndices(indexShard.recoveryState().getIndex(), directory, indexSort, sources, maxSeqNo);
|
||||
internalRecoverFromStore(indexShard);
|
||||
// just trigger a merge to do housekeeping on the
|
||||
// copied segments - we will also see them in stats etc.
|
||||
|
@ -131,8 +134,13 @@ final class StoreRecovery {
|
|||
return false;
|
||||
}
|
||||
|
||||
void addIndices(RecoveryState.Index indexRecoveryStats, Directory target, Sort indexSort, Directory... sources) throws IOException {
|
||||
target = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
|
||||
void addIndices(
|
||||
final RecoveryState.Index indexRecoveryStats,
|
||||
final Directory target,
|
||||
final Sort indexSort,
|
||||
final Directory[] sources,
|
||||
final long maxSeqNo) throws IOException {
|
||||
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
|
||||
IndexWriterConfig iwc = new IndexWriterConfig(null)
|
||||
.setCommitOnClose(false)
|
||||
// we don't want merges to happen here - we call maybe merge on the engine
|
||||
|
@ -143,8 +151,19 @@ final class StoreRecovery {
|
|||
if (indexSort != null) {
|
||||
iwc.setIndexSort(indexSort);
|
||||
}
|
||||
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(target, indexRecoveryStats), iwc)) {
|
||||
try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) {
|
||||
writer.addIndexes(sources);
|
||||
/*
|
||||
* We set the maximum sequence number and the local checkpoint on the target to the maximum of the maximum sequence numbers on
|
||||
* the source shards. This ensures that history after this maximum sequence number can advance and we have correct
|
||||
* document-level semantics.
|
||||
*/
|
||||
writer.setLiveCommitData(() -> {
|
||||
final HashMap<String, String> liveCommitData = new HashMap<>(2);
|
||||
liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
|
||||
liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
|
||||
return liveCommitData.entrySet().iterator();
|
||||
});
|
||||
writer.commit();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.create;
|
||||
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.search.Sort;
|
||||
import org.apache.lucene.search.SortField;
|
||||
import org.apache.lucene.search.SortedSetSelector;
|
||||
|
@ -29,6 +28,8 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
|
|||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -36,10 +37,8 @@ import org.elasticsearch.cluster.ClusterInfoService;
|
|||
import org.elasticsearch.cluster.InternalClusterInfoService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
|
@ -48,8 +47,8 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.query.TermsQueryBuilder;
|
||||
import org.elasticsearch.index.seqno.SeqNoStats;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
@ -58,15 +57,11 @@ import org.elasticsearch.test.VersionUtils;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
@ -233,7 +228,8 @@ public class ShrinkIndexIT extends ESIntegTestCase {
|
|||
.put("number_of_shards", randomIntBetween(2, 7))
|
||||
.put("index.version.created", version)
|
||||
).get();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
final int docs = randomIntBetween(0, 128);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
client().prepareIndex("source", "type")
|
||||
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
|
||||
}
|
||||
|
@ -252,13 +248,26 @@ public class ShrinkIndexIT extends ESIntegTestCase {
|
|||
.put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
|
||||
final IndicesStatsResponse sourceStats = client().admin().indices().prepareStats("source").get();
|
||||
final long maxSeqNo =
|
||||
Arrays.stream(sourceStats.getShards()).map(ShardStats::getSeqNoStats).mapToLong(SeqNoStats::getMaxSeqNo).max().getAsLong();
|
||||
// now merge source into a single shard index
|
||||
|
||||
final boolean createWithReplicas = randomBoolean();
|
||||
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
|
||||
.setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get());
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
final IndicesStatsResponse targetStats = client().admin().indices().prepareStats("target").get();
|
||||
for (final ShardStats shardStats : targetStats.getShards()) {
|
||||
final SeqNoStats seqNoStats = shardStats.getSeqNoStats();
|
||||
assertThat(seqNoStats.getMaxSeqNo(), equalTo(maxSeqNo));
|
||||
assertThat(seqNoStats.getLocalCheckpoint(), equalTo(maxSeqNo));
|
||||
}
|
||||
|
||||
final int size = docs > 0 ? 2 * docs : 1;
|
||||
assertHitCount(client().prepareSearch("target").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
|
||||
|
||||
if (createWithReplicas == false) {
|
||||
// bump replicas
|
||||
|
@ -266,16 +275,16 @@ public class ShrinkIndexIT extends ESIntegTestCase {
|
|||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_replicas", 1)).get();
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
assertHitCount(client().prepareSearch("target").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
|
||||
}
|
||||
|
||||
for (int i = 20; i < 40; i++) {
|
||||
for (int i = docs; i < 2 * docs; i++) {
|
||||
client().prepareIndex("target", "type")
|
||||
.setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}", XContentType.JSON).get();
|
||||
}
|
||||
flushAndRefresh();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 40);
|
||||
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
assertHitCount(client().prepareSearch("target").setSize(2 * size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 2 * docs);
|
||||
assertHitCount(client().prepareSearch("source").setSize(size).setQuery(new TermsQueryBuilder("foo", "bar")).get(), docs);
|
||||
GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get();
|
||||
assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null));
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.lucene.store.Directory;
|
|||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
|
@ -46,8 +47,11 @@ import java.nio.file.Path;
|
|||
import java.nio.file.attribute.BasicFileAttributes;
|
||||
import java.security.AccessControlException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
|
||||
public class StoreRecoveryTests extends ESTestCase {
|
||||
|
||||
public void testAddIndices() throws IOException {
|
||||
|
@ -82,7 +86,8 @@ public class StoreRecoveryTests extends ESTestCase {
|
|||
StoreRecovery storeRecovery = new StoreRecovery(new ShardId("foo", "bar", 1), logger);
|
||||
RecoveryState.Index indexStats = new RecoveryState.Index();
|
||||
Directory target = newFSDirectory(createTempDir());
|
||||
storeRecovery.addIndices(indexStats, target, indexSort, dirs);
|
||||
final long maxSeqNo = randomNonNegativeLong();
|
||||
storeRecovery.addIndices(indexStats, target, indexSort, dirs, maxSeqNo);
|
||||
int numFiles = 0;
|
||||
Predicate<String> filesFilter = (f) -> f.startsWith("segments") == false && f.equals("write.lock") == false
|
||||
&& f.startsWith("extra") == false;
|
||||
|
@ -99,6 +104,9 @@ public class StoreRecoveryTests extends ESTestCase {
|
|||
}
|
||||
DirectoryReader reader = DirectoryReader.open(target);
|
||||
SegmentInfos segmentCommitInfos = SegmentInfos.readLatestCommit(target);
|
||||
final Map<String, String> userData = segmentCommitInfos.getUserData();
|
||||
assertThat(userData.get(SequenceNumbers.MAX_SEQ_NO), equalTo(Long.toString(maxSeqNo)));
|
||||
assertThat(userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY), equalTo(Long.toString(maxSeqNo)));
|
||||
for (SegmentCommitInfo info : segmentCommitInfos) { // check that we didn't merge
|
||||
assertEquals("all sources must be flush",
|
||||
info.info.getDiagnostics().get("source"), "flush");
|
||||
|
|
Loading…
Reference in New Issue