diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 66911ab80c7..fbc87f2279b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -71,6 +71,7 @@ public final class EngineConfig { private final List refreshListeners; @Nullable private final Sort indexSort; + private final boolean forceNewHistoryUUID; private final TranslogRecoveryRunner translogRecoveryRunner; /** @@ -115,8 +116,9 @@ public final class EngineConfig { MergePolicy mergePolicy, Analyzer analyzer, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, - TranslogConfig translogConfig, TimeValue flushMergesAfter, List refreshListeners, - Sort indexSort, TranslogRecoveryRunner translogRecoveryRunner) { + boolean forceNewHistoryUUID, TranslogConfig translogConfig, TimeValue flushMergesAfter, + List refreshListeners, Sort indexSort, + TranslogRecoveryRunner translogRecoveryRunner) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -141,6 +143,7 @@ public final class EngineConfig { this.translogConfig = translogConfig; this.flushMergesAfter = flushMergesAfter; this.openMode = openMode; + this.forceNewHistoryUUID = forceNewHistoryUUID; this.refreshListeners = refreshListeners; this.indexSort = indexSort; this.translogRecoveryRunner = translogRecoveryRunner; @@ -300,6 +303,15 @@ public final class EngineConfig { return openMode; } + + /** + * Returns true if a new history uuid must be generated. If false, a new uuid will only be generated if no existing + * one is found. + */ + public boolean getForceNewHistoryUUID() { + return forceNewHistoryUUID; + } + @FunctionalInterface public interface TranslogRecoveryRunner { int run(Engine engine, Translog.Snapshot snapshot) throws IOException; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index d7cf3e16069..3655a2096dd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -177,23 +177,15 @@ public class InternalEngine extends Engine { switch (openMode) { case OPEN_INDEX_AND_TRANSLOG: writer = createWriter(false); - String existingHistoryUUID = loadHistoryUUIDFromCommit(writer); - if (existingHistoryUUID == null) { - historyUUID = UUIDs.randomBase64UUID(); - } else { - historyUUID = existingHistoryUUID; - } final long globalCheckpoint = Translog.readGlobalCheckpoint(engineConfig.getTranslogConfig().getTranslogPath()); seqNoStats = store.loadSeqNoStats(globalCheckpoint); break; case OPEN_INDEX_CREATE_TRANSLOG: writer = createWriter(false); - historyUUID = loadHistoryUUIDFromCommit(writer); seqNoStats = store.loadSeqNoStats(SequenceNumbers.UNASSIGNED_SEQ_NO); break; case CREATE_INDEX_AND_TRANSLOG: writer = createWriter(true); - historyUUID = UUIDs.randomBase64UUID(); seqNoStats = new SeqNoStats( SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED, @@ -205,9 +197,13 @@ public class InternalEngine extends Engine { logger.trace("recovered [{}]", seqNoStats); seqNoService = sequenceNumberService(shardId, allocationId, engineConfig.getIndexSettings(), seqNoStats); updateMaxUnsafeAutoIdTimestampFromWriter(writer); + historyUUID = loadOrGenerateHistoryUUID(writer, engineConfig.getForceNewHistoryUUID()); + Objects.requireNonNull(historyUUID, "history uuid should not be null"); indexWriter = writer; translog = openTranslog(engineConfig, writer, translogDeletionPolicy, () -> seqNoService().getGlobalCheckpoint()); assert translog.getGeneration() != null; + this.translog = translog; + updateWriterOnOpen(); } catch (IOException | TranslogCorruptedException e) { throw new EngineCreationFailureException(shardId, "failed to create engine", e); } catch (AssertionError e) { @@ -219,8 +215,6 @@ public class InternalEngine extends Engine { throw e; } } - - this.translog = translog; manager = createSearcherManager(); this.searcherManager = manager; this.versionMap.setManager(searcherManager); @@ -375,24 +369,32 @@ public class InternalEngine extends Engine { throw new IndexFormatTooOldException("translog", "translog has no generation nor a UUID - this might be an index from a previous version consider upgrading to N-1 first"); } } - final Translog translog = new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); - if (translogUUID == null) { - assert openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "OpenMode must not be " - + EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; - boolean success = false; - try { - commitIndexWriter(writer, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG - ? commitDataAsMap(writer).get(SYNC_COMMIT_ID) : null); - success = true; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(translog); - } - } - } - return translog; + return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier); } + /** If needed, updates the metadata in the index writer to match the potentially new translog and history uuid */ + private void updateWriterOnOpen() throws IOException { + Objects.requireNonNull(historyUUID); + final Map commitUserData = commitDataAsMap(indexWriter); + boolean needsCommit = false; + if (historyUUID.equals(commitUserData.get(HISTORY_UUID_KEY)) == false) { + needsCommit = true; + } else { + assert config().getForceNewHistoryUUID() == false : "config forced a new history uuid but it didn't change"; + assert openMode != EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG : "new index but it already has an existing history uuid"; + } + if (translog.getTranslogUUID().equals(commitUserData.get(Translog.TRANSLOG_UUID_KEY)) == false) { + needsCommit = true; + } else { + assert openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : "translog uuid didn't change but open mode is " + openMode; + } + if (needsCommit) { + commitIndexWriter(indexWriter, translog, openMode == EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG + ? commitUserData.get(SYNC_COMMIT_ID) : null); + } + } + + @Override public Translog getTranslog() { ensureOpen(); @@ -424,14 +426,17 @@ public class InternalEngine extends Engine { } /** - * Reads the current stored history ID from the IW commit data. If the id is not found, returns null. + * Reads the current stored history ID from the IW commit data. Generates a new UUID if not found or if generation is forced. */ - @Nullable - private String loadHistoryUUIDFromCommit(final IndexWriter writer) throws IOException { + private String loadOrGenerateHistoryUUID(final IndexWriter writer, boolean forceNew) throws IOException { String uuid = commitDataAsMap(writer).get(HISTORY_UUID_KEY); - if (uuid == null) { - assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : - "index was created after 6_0_0_rc1 but has no history uuid"; + if (uuid == null || forceNew) { + assert + forceNew || // recovery from a local store creates an index that doesn't have yet a history_uuid + openMode == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG || + config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1) : + "existing index was created after 6_0_0_rc1 but has no history uuid"; + uuid = UUIDs.randomBase64UUID(); } return uuid; } @@ -1923,9 +1928,7 @@ public class InternalEngine extends Engine { } commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(seqNoService().getMaxSeqNo())); commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get())); - if (historyUUID != null) { - commitData.put(HISTORY_UUID_KEY, historyUUID); - } + commitData.put(HISTORY_UUID_KEY, historyUUID); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 24ad4cdb1b8..f4a771a3b3f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2099,10 +2099,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { Sort indexSort = indexSortSupplier.get(); + final boolean forceNewHistoryUUID; + switch (shardRouting.recoverySource().getType()) { + case EXISTING_STORE: + case PEER: + forceNewHistoryUUID = false; + break; + case EMPTY_STORE: + case SNAPSHOT: + case LOCAL_SHARDS: + forceNewHistoryUUID = true; + break; + default: + throw new AssertionError("unknown recovery type: [" + shardRouting.recoverySource().getType() + "]"); + } return new EngineConfig(openMode, shardId, shardRouting.allocationId().getId(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, - indexCache.query(), cachingPolicy, translogConfig, + indexCache.query(), cachingPolicy, forceNewHistoryUUID, translogConfig, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Arrays.asList(refreshListeners, new RefreshMetricUpdater(refreshMetric)), indexSort, this::runTranslogRecovery); diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 63b7bc08055..e5053fc7882 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -35,12 +35,10 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; -import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; @@ -164,11 +162,10 @@ final class StoreRecovery { * document-level semantics. */ writer.setLiveCommitData(() -> { - final HashMap liveCommitData = new HashMap<>(4); + final HashMap liveCommitData = new HashMap<>(3); liveCommitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); liveCommitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); liveCommitData.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp)); - liveCommitData.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); return liveCommitData.entrySet().iterator(); }); writer.commit(); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 8b78227ca31..5971cd38774 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.index.engine; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; - import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -270,8 +269,8 @@ public class InternalEngineTests extends ESTestCase { return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), - config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), config.getIndexSort(), - config.getTranslogRecoveryRunner()); + config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), + config.getIndexSort(), config.getTranslogRecoveryRunner()); } @Override @@ -452,7 +451,7 @@ public class InternalEngineTests extends ESTestCase { refreshListener == null ? emptyList() : Collections.singletonList(refreshListener); EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, - IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler); return config; @@ -2796,8 +2795,8 @@ public class InternalEngineTests extends ESTestCase { EngineConfig brokenConfig = new EngineConfig(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, shardId, allocationId.getId(), threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getRefreshListeners(), - null, config.getTranslogRecoveryRunner()); + IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -2809,7 +2808,7 @@ public class InternalEngineTests extends ESTestCase { assertVisibleCount(engine, numDocs, false); } - public void testRecoverFromStoreSetsHistoryUUIDIfNeeded() throws IOException { + public void testHistoryUUIDIsSetIfMissing() throws IOException { final int numDocs = randomIntBetween(0, 3); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); @@ -2842,11 +2841,56 @@ public class InternalEngineTests extends ESTestCase { .put(defaultSettings.getSettings()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1) .build()); - engine = createEngine(indexSettings, store, primaryTranslogDir, newMergePolicy(), null); - assertVisibleCount(engine, numDocs, false); + + EngineConfig config = engine.config(); + + EngineConfig newConfig = new EngineConfig( + randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, + shardId, allocationId.getId(), + threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), false, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + engine = new InternalEngine(newConfig); + if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + engine.recoverFromTranslog(); + assertVisibleCount(engine, numDocs, false); + } else { + assertVisibleCount(engine, 0, false); + } assertThat(engine.getHistoryUUID(), notNullValue()); } + public void testHistoryUUIDCanBeForced() throws IOException { + final int numDocs = randomIntBetween(0, 3); + for (int i = 0; i < numDocs; i++) { + ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + Engine.IndexResult index = engine.index(firstIndexRequest); + assertThat(index.getVersion(), equalTo(1L)); + } + assertVisibleCount(engine, numDocs); + final String oldHistoryUUID = engine.getHistoryUUID(); + engine.close(); + EngineConfig config = engine.config(); + + EngineConfig newConfig = new EngineConfig( + randomBoolean() ? EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG : EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, + shardId, allocationId.getId(), + threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), true, config.getTranslogConfig(), TimeValue.timeValueMinutes(5), + config.getRefreshListeners(), null, config.getTranslogRecoveryRunner()); + engine = new InternalEngine(newConfig); + if (newConfig.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { + engine.recoverFromTranslog(); + assertVisibleCount(engine, numDocs, false); + } else { + assertVisibleCount(engine, 0, false); + } + assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID))); + } + public void testShardNotAvailableExceptionWhenEngineClosedConcurrently() throws IOException, InterruptedException { AtomicReference exception = new AtomicReference<>(); String operation = randomFrom("optimize", "refresh", "flush"); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 01893a99ae4..1f24d0b079d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -119,7 +119,7 @@ public class RefreshListenersTests extends ESTestCase { }; EngineConfig config = new EngineConfig(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), - eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, + eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), null, null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 601ca1b8210..e2edb33fafa 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.ingest.DeletePipelineRequest; @@ -66,6 +67,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; @@ -119,6 +121,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.startsWith; @@ -170,8 +173,23 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas flushResponseFuture = client.admin().indices().prepareFlush(indices).execute(); } } + + final String[] indicesToSnapshot = {"test-idx-*", "-test-idx-3"}; + + logger.info("--> capturing history UUIDs"); + final Map historyUUIDs = new HashMap<>(); + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + if (historyUUIDs.containsKey(shardId)) { + assertThat(shardStats.getShardRouting() + " has a different history uuid", historyUUID, equalTo(historyUUIDs.get(shardId))); + } else { + historyUUIDs.put(shardId, historyUUID); + } + } + logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test-idx-*", "-test-idx-3").get(); + CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices(indicesToSnapshot).get(); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); @@ -211,6 +229,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertHitCount(client.prepareSearch("test-idx-3").setSize(0).get(), 50L); } + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + assertThat(shardStats.getShardRouting() + " doesn't have a history uuid", historyUUID, notNullValue()); + assertThat(shardStats.getShardRouting() + " doesn't have a new history", historyUUID, not(equalTo(historyUUIDs.get(shardId)))); + } + // Test restore after index deletion logger.info("--> delete indices"); cluster().wipeIndices("test-idx-1", "test-idx-2"); @@ -226,6 +251,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + for (ShardStats shardStats: client().admin().indices().prepareStats(indicesToSnapshot).clear().get().getShards()) { + String historyUUID = shardStats.getCommitStats().getUserData().get(Engine.HISTORY_UUID_KEY); + ShardId shardId = shardStats.getShardRouting().shardId(); + assertThat(shardStats.getShardRouting() + " doesn't have a history uuid", historyUUID, notNullValue()); + assertThat(shardStats.getShardRouting() + " doesn't have a new history", historyUUID, not(equalTo(historyUUIDs.get(shardId)))); + } + if (flushResponseFuture != null) { // Finish flush flushResponseFuture.actionGet(); diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java new file mode 100644 index 00000000000..26a192cce9e --- /dev/null +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/RecoveryIT.java @@ -0,0 +1,119 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.upgrades; + +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; + +public class RecoveryIT extends ESRestTestCase { + + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + @Override + protected boolean preserveReposUponCompletion() { + return true; + } + + private enum CLUSTER_TYPE { + OLD, + MIXED, + UPGRADED; + + public static CLUSTER_TYPE parse(String value) { + switch (value) { + case "old_cluster": + return OLD; + case "mixed_cluster": + return MIXED; + case "upgraded_cluster": + return UPGRADED; + default: + throw new AssertionError("unknown cluster type: " + value); + } + } + } + + private final CLUSTER_TYPE clusterType = CLUSTER_TYPE.parse(System.getProperty("tests.rest.suite")); + + private void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + } + + private void ensureGreen() throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("wait_for_no_relocating_shards", "true"); + assertOK(client().performRequest("GET", "_cluster/health", params)); + } + + private void createIndex(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name, Collections.emptyMap(), + new StringEntity("{ \"settings\": " + Strings.toString(settings) + " }", ContentType.APPLICATION_JSON))); + } + + + public void testHistoryUUIDIsGenerated() throws Exception { + final String index = "index_history_uuid"; + if (clusterType == CLUSTER_TYPE.OLD) { + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1); + createIndex(index, settings.build()); + } else if (clusterType == CLUSTER_TYPE.UPGRADED) { + ensureGreen(); + Response response = client().performRequest("GET", index + "/_stats", Collections.singletonMap("level", "shards")); + assertOK(response); + ObjectPath objectPath = ObjectPath.createFromResponse(response); + List shardStats = objectPath.evaluate("indices." + index + ".shards.0"); + assertThat(shardStats, hasSize(2)); + String expectHistoryUUID = null; + for (int shard = 0; shard < 2; shard++) { + String nodeID = objectPath.evaluate("indices." + index + ".shards.0." + shard + ".routing.node"); + String historyUUID = objectPath.evaluate("indices." + index + ".shards.0." + shard + ".commit.user_data.history_uuid"); + assertThat("no history uuid found for shard on " + nodeID, historyUUID, notNullValue()); + if (expectHistoryUUID == null) { + expectHistoryUUID = historyUUID; + } else { + assertThat("different history uuid found for shard on " + nodeID, historyUUID, equalTo(expectHistoryUUID)); + } + } + } + } + +}