From 04385a9ce914125a3eab626c657ef60d026cde1f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 19 Sep 2017 15:58:36 +0200 Subject: [PATCH] Restoring from snapshot should force generation of a new history uuid (#26694) Restoring a shard from snapshot throws the primary back in time violating assumptions and bringing the validity of global checkpoints in question. To avoid problems, we should make sure that a shard that was restored will never be the source of an ops based recovery to a shard that existed before the restore. To this end we have introduced the notion of `histroy_uuid` in #26577 and required that both source and target will have the same history to allow ops based recoveries. This PR make sure that a shard gets a new uuid after restore. As suggested by @ywelsch , I derived the creation of a `history_uuid` from the `RecoverySource` of the shard. Store recovery will only generate a uuid if it doesn't already exist (we can make this stricter when we don't need to deal with 5.x indices). Peer recovery follows the same logic (note that this is different than the approach in #26557, I went this way as it means that shards always have a history uuid after being recovered on a 6.x node and will also mean that a rolling restart is enough for old indices to step over to the new seq no model). Local shards and snapshot force the generation of a new translog uuid. Relates #10708 Closes #26544 --- .../index/engine/EngineConfig.java | 16 ++- .../index/engine/InternalEngine.java | 73 +++++------ .../elasticsearch/index/shard/IndexShard.java | 16 ++- .../index/shard/StoreRecovery.java | 5 +- .../index/engine/InternalEngineTests.java | 62 +++++++-- .../index/shard/RefreshListenersTests.java | 2 +- .../SharedClusterSnapshotRestoreIT.java | 34 ++++- .../elasticsearch/upgrades/RecoveryIT.java | 119 ++++++++++++++++++ 8 files changed, 274 insertions(+), 53 deletions(-) create mode 100644 qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 66911ab80c7..fbc87f2279b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -71,6 +71,7 @@ public final class EngineConfig { private final List refreshListeners; @Nullable private final Sort indexSort; + private final boolean forceNewHistoryUUID; private final TranslogRecoveryRunner translogRecoveryRunner; /** @@ -115,8 +116,9 @@ public final class EngineConfig { MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - TranslogConfig translogConfig, TimeValue flushMergesAfter, List refreshListeners, - Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner) { + boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter, + List refreshListeners, Sort indexSort, + TranslogRecoveryRunner translogRecoveryRunner) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -141,6 +143,7 @@ public final class EngineConfig { this.translogConfig = translogConfig; this.flushMergesAfter = flushMergesAfter; this.openMode = openMode; + this.forceNewHistoryUUID = forceNewHistoryUUID; this.refreshListeners = refreshListeners; this.indexSort = indexSort; this.translogRecoveryRunner = translogRecoveryRunner; @@ -300,6 +303,15 @@ public final class EngineConfig { return openMode; } + + /** + * Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing + * one is found. + */ + public boolean getForceNewHistoryUUID() { + return forceNewHistoryUUID; + } + @FunctionalInterface public interface TranslogRecoveryRunner { int run(Engine engine, Translog.Snapshot snapshot) throws IOException; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d7cf3e16069..3655a2096dd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -177,23 +177,15 @@ public class InternalEngine extends Engine { switch (openMode) { case OPEN_INDEX_AND_TRANSLOG: writer = createWriter(false); - String existingHistoryUUID = loadHistoryUUIDFromCommit(writer); - if (existingHistoryUUID == null) { - historyUUID = UUIDs.randomBase64UUID(); - } else { - historyUUID = existingHistoryUUID; - } final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); seqNoStats = store.loadSeqNoStats(globalCheckpoint); break; case OPEN_INDEX_CREATE_TRANSLOG: writer = createWriter(false); - historyUUID = loadHistoryUUIDFromCommit(writer); seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO); break; case CREATE_INDEX_AND_TRANSLOG: writer = createWriter(true); - historyUUID = UUIDs.randomBase64UUID(); seqNoStats = new SeqNoStats( SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, @@ -205,9 +197,13 @@ public class InternalEngine extends Engine { logger.trace("recovered [{}]", seqNoStats); seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); + historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID()); + Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); assert translog.getGeneration() != null; + this.translog = translog; + updateWriterOnOpen(); } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); } catch (AssertionError e) { @@ -219,8 +215,6 @@ public class InternalEngine extends Engine { throw e; } } - - this.translog = translog; manager = createSearcherManager(); this.searcherManager = manager; this.versionMap.setManager(searcherManager); @@ -375,24 +369,32 @@ public class InternalEngine extends Engine { throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); } } - final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); - if (translogUUID == null) { - assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " - + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; - boolean success = false; - try { - commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG - ? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null); - success = true; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(translog); - } - } - } - return translog; + return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); } + /** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */ + private void updateWriterOnOpen() throws IOException { + Objects.requireNonNull(historyUUID); + final Map commitUserData = commitDataAsMap(indexWriter); + boolean needsCommit = false; + if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) { + needsCommit = true; + } else { + assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change"; + assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid"; + } + if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) { + needsCommit = true; + } else { + assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode; + } + if (needsCommit) { + commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG + ? commitUserData.get(SYNC_COMMIT_ID) : null); + } + } + + @Override public Translog getTranslog() { ensureOpen(); @@ -424,14 +426,17 @@ public class InternalEngine extends Engine { } /** - * Reads the current stored history ID from the IW commit data. If the id is not found, returns null. + * Reads the current stored history ID from the IW commit data. Generates a new UUID if not found or if generation is forced. */ - @Nullable - private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException { + private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean forceNew) throws IOException { String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); - if (uuid == null) { - assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : - "index was created after 6_0_0_rc1 but has no history uuid"; + if (uuid == null || forceNew) { + assert + forceNew || // recovery from a local store creates an index that doesn't have yet a history_uuid + openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || + config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : + "existing index was created after 6_0_0_rc1 but has no history uuid"; + uuid = UUIDs.randomBase64UUID(); } return uuid; } @@ -1923,9 +1928,7 @@ public class InternalEngine extends Engine { } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); - if (historyUUID != null) { - commitData.put(HISTORY_UUID_KEY, historyUUID); - } + commitData.put(HISTORY_UUID_KEY, historyUUID); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 24ad4cdb1b8..f4a771a3b3f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2099,10 +2099,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { Sort indexSort = indexSortSupplier.get(); + final boolean forceNewHistoryUUID; + switch (shardRouting.recoverySource().getType()) { + case EXISTING_STORE: + case PEER: + forceNewHistoryUUID = false; + break; + case EMPTY_STORE: + case SNAPSHOT: + case LOCAL_SHARDS: + forceNewHistoryUUID = true; + break; + default: + throw new AssertionError("unknown recovery type: [" + shardRouting.recoverySource().getType() + "]"); + } return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, - indexCache.query(), cachingPolicy, translogConfig, + indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort, this::runTranslogRecovery); diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 63b7bc08055..e5053fc7882 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -35,12 +35,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; @@ -164,11 +162,10 @@ final class StoreRecovery { * document-level semantics. */ writer.setLiveCommitData(() -> { - final HashMap liveCommitData = new HashMap<>(4); + final HashMap liveCommitData = new HashMap<>(3); liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp)); - liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); return liveCommitData.entrySet().iterator(); }); writer.commit(); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8b78227ca31..5971cd38774 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -270,8 +269,8 @@ public class InternalEngineTests extends ESTestCase { return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), - config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), - config.getTranslogRecoveryRunner()); + config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), + config.getIndexSort(), config.getTranslogRecoveryRunner()); } @Override @@ -452,7 +451,7 @@ public class InternalEngineTests extends ESTestCase { refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); return config; @@ -2796,8 +2795,8 @@ public class InternalEngineTests extends ESTestCase { EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(), threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(), - null, config.getTranslogRecoveryRunner()); + IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -2809,7 +2808,7 @@ public class InternalEngineTests extends ESTestCase { assertVisibleCount(engine, numDocs, false); } - public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException { + public void testHistoryUUIDIsSetIfMissing() throws IOException { final int numDocs = randomIntBetween(0, 3); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); @@ -2842,11 +2841,56 @@ public class InternalEngineTests extends ESTestCase { .put(defaultSettings.getSettings()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1) .build()); - engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy(), null); - assertVisibleCount(engine, numDocs, false); + + EngineConfig config = engine.config(); + + EngineConfig newConfig = new EngineConfig( + randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, + shardId, allocationId.getId(), + threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + engine = new InternalEngine(newConfig); + if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + engine.recoverFromTranslog(); + assertVisibleCount(engine, numDocs, false); + } else { + assertVisibleCount(engine, 0, false); + } assertThat(engine.getHistoryUUID(), notNullValue()); } + public void testHistoryUUIDCanBeForced() throws IOException { + final int numDocs = randomIntBetween(0, 3); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + Engine.IndexResult index = engine.index(firstIndexRequest); + assertThat(index.getVersion(), equalTo(1L)); + } + assertVisibleCount(engine, numDocs); + final String oldHistoryUUID = engine.getHistoryUUID(); + engine.close(); + EngineConfig config = engine.config(); + + EngineConfig newConfig = new EngineConfig( + randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, + shardId, allocationId.getId(), + threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + engine = new InternalEngine(newConfig); + if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + engine.recoverFromTranslog(); + assertVisibleCount(engine, numDocs, false); + } else { + assertVisibleCount(engine, 0, false); + } + assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID))); + } + public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException { AtomicReference exception = new AtomicReference<>(); String operation = randomFrom("optimize", "refresh", "flush"); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 01893a99ae4..1f24d0b079d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -119,7 +119,7 @@ public class RefreshListenersTests extends ESTestCase { }; EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), - eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 601ca1b8210..e2edb33fafa 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.ingest.DeletePipelineRequest; @@ -66,6 +67,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; @@ -119,6 +121,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; @@ -170,8 +173,23 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas flushResponseFuture = client.admin().indices().prepareFlush(indices).execute(); } } + + final String[] indicesToSnapshot = {"test-idx-*", "-test-idx-3"}; + + logger.info("--> capturing history UUIDs"); + final Map historyUUIDs = new HashMap<>(); + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + if (historyUUIDs.containsKey(shardId)) { + assertThat(shardStats.getShardRouting() + " has a different history uuid", historyUUID, equalTo(historyUUIDs.get(shardId))); + } else { + historyUUIDs.put(shardId, historyUUID); + } + } + logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get(); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices(indicesToSnapshot).get(); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); @@ -211,6 +229,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertHitCount(client.prepareSearch("test-idx-3").setSize(0).get(), 50L); } + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + assertThat(shardStats.getShardRouting() + " doesn't have a history uuid", historyUUID, notNullValue()); + assertThat(shardStats.getShardRouting() + " doesn't have a new history", historyUUID, not(equalTo(historyUUIDs.get(shardId)))); + } + // Test restore after index deletion logger.info("--> delete indices"); cluster().wipeIndices("test-idx-1", "test-idx-2"); @@ -226,6 +251,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + assertThat(shardStats.getShardRouting() + " doesn't have a history uuid", historyUUID, notNullValue()); + assertThat(shardStats.getShardRouting() + " doesn't have a new history", historyUUID, not(equalTo(historyUUIDs.get(shardId)))); + } + if (flushResponseFuture != null) { // Finish flush flushResponseFuture.actionGet(); diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java new file mode 100644 index 00000000000..26a192cce9e --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.upgrades; + +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; + +public class RecoveryIT extends ESRestTestCase { + + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + @Override + protected boolean preserveReposUponCompletion() { + return true; + } + + private enum CLUSTER_TYPE { + OLD, + MIXED, + UPGRADED; + + public static CLUSTER_TYPE parse(String value) { + switch (value) { + case "old_cluster": + return OLD; + case "mixed_cluster": + return MIXED; + case "upgraded_cluster": + return UPGRADED; + default: + throw new AssertionError("unknown cluster type: " + value); + } + } + } + + private final CLUSTER_TYPE clusterType = CLUSTER_TYPE.parse(System.getProperty("tests.rest.suite")); + + private void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + } + + private void ensureGreen() throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("wait_for_no_relocating_shards", "true"); + assertOK(client().performRequest("GET", "_cluster/health", params)); + } + + private void createIndex(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name, Collections.emptyMap(), + new StringEntity("{ \"settings\": " + Strings.toString(settings) + " }", ContentType.APPLICATION_JSON))); + } + + + public void testHistoryUUIDIsGenerated() throws Exception { + final String index = "index_history_uuid"; + if (clusterType == CLUSTER_TYPE.OLD) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1); + createIndex(index, settings.build()); + } else if (clusterType == CLUSTER_TYPE.UPGRADED) { + ensureGreen(); + Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards")); + assertOK(response); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + List shardStats = objectPath.evaluate("indices." + index + ".shards.0"); + assertThat(shardStats, hasSize(2)); + String expectHistoryUUID = null; + for (int shard = 0; shard < 2; shard++) { + String nodeID = objectPath.evaluate("indices." + index + ".shards.0." + shard + ".routing.node"); + String historyUUID = objectPath.evaluate("indices." + index + ".shards.0." + shard + ".commit.user_data.history_uuid"); + assertThat("no history uuid found for shard on " + nodeID, historyUUID, notNullValue()); + if (expectHistoryUUID == null) { + expectHistoryUUID = historyUUID; + } else { + assertThat("different history uuid found for shard on " + nodeID, historyUUID, equalTo(expectHistoryUUID)); + } + } + } + } + +}