From f5d4550e93e4ae84f42b9512a3a3f7344cd62be3 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Mon, 26 Mar 2018 14:08:03 +0200 Subject: [PATCH] Fold EngineDiskUtils into Store, for better lock semantics (#29156) #28245 has introduced the utility class`EngineDiskUtils` with a set of methods to prepare/change translog and lucene commit points. That util class bundled everything that's needed to create and empty shard, bootstrap a shard from a lucene index that was just restored etc. In order to safely do these manipulations, the util methods acquired the IndexWriter's lock. That would sometime fail due to concurrent shard store fetching or other short activities that require the files not to be changed while they read from them. Since there is no way to wait on the index writer lock, the `Store` class has other locks to make sure that once we try to acquire the IW lock, it will succeed. To side step this waiting problem, this PR folds `EngineDiskUtils` into `Store`. Sadly this comes with a price - the store class doesn't and shouldn't know about the translog. As such the logic is slightly less tight and callers have to do the translog manipulations on their own. --- docs/reference/indices/flush.asciidoc | 2 +- .../org/elasticsearch/index/IndexService.java | 2 +- .../index/engine/EngineDiskUtils.java | 144 ------------ .../index/shard/StoreRecovery.java | 21 +- .../org/elasticsearch/index/store/Store.java | 104 ++++++++- .../indices/recovery/RecoveryTarget.java | 8 +- .../index/engine/EngineDiskUtilsTests.java | 207 ------------------ .../index/engine/InternalEngineTests.java | 98 ++++++++- .../index/shard/IndexShardTests.java | 12 +- .../index/shard/RefreshListenersTests.java | 9 +- .../elasticsearch/index/store/StoreTests.java | 58 ++++- .../recovery/RecoverySourceHandlerTests.java | 2 +- .../index/engine/EngineTestCase.java | 12 +- .../index/shard/IndexShardTestCase.java | 2 +- 14 files changed, 293 insertions(+), 388 deletions(-) delete mode 100644 server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java delete mode 100644 server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java diff --git a/docs/reference/indices/flush.asciidoc b/docs/reference/indices/flush.asciidoc index 91fac0908ef..87b1e90a4d5 100644 --- a/docs/reference/indices/flush.asciidoc +++ b/docs/reference/indices/flush.asciidoc @@ -93,7 +93,7 @@ which returns something similar to: { "commit" : { "id" : "3M3zkw2GHMo2Y4h4/KFKCg==", - "generation" : 3, + "generation" : 4, "user_data" : { "translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA", "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ", diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 4fe50b983df..64760629bfd 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -24,7 +24,6 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.Sort; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.Accountable; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -40,6 +39,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLockObtainFailedException; diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java b/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java deleted file mode 100644 index f7f3aa8e9fe..00000000000 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineDiskUtils.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.engine; - -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.store.Directory; -import org.elasticsearch.Assertions; -import org.elasticsearch.common.UUIDs; -import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -/** - * This class contains utility methods for mutating the shard lucene index and translog as a preparation to be opened. - */ -public abstract class EngineDiskUtils { - - /** - * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. - */ - public static void createEmpty(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException { - try (IndexWriter writer = newIndexWriter(true, dir)) { - final String translogUuid = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId); - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); - map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); - map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); - map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); - map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1"); - updateCommitData(writer, map); - } - } - - - /** - * Converts an existing lucene index and marks it with a new history uuid. Also creates a new empty translog file. - * This is used to make sure no existing shard will recovery from this index using ops based recovery. - */ - public static void bootstrapNewHistoryFromLuceneIndex(final Directory dir, final Path translogPath, final ShardId shardId) - throws IOException { - try (IndexWriter writer = newIndexWriter(false, dir)) { - final Map userData = getUserData(writer); - final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); - final String translogUuid = Translog.createEmptyTranslog(translogPath, maxSeqNo, shardId); - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); - map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); - map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); - updateCommitData(writer, map); - } - } - - /** - * Creates a new empty translog and associates it with an existing lucene index. - */ - public static void createNewTranslog(final Directory dir, final Path translogPath, long initialGlobalCheckpoint, final ShardId shardId) - throws IOException { - if (Assertions.ENABLED) { - final List existingCommits = DirectoryReader.listCommits(dir); - assert existingCommits.size() == 1 : "creating a translog translog should have one commit, commits[" + existingCommits + "]"; - SequenceNumbers.CommitInfo commitInfo = Store.loadSeqNoInfo(existingCommits.get(0)); - assert commitInfo.localCheckpoint >= initialGlobalCheckpoint : - "trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint [" - + initialGlobalCheckpoint + "]"; - } - - try (IndexWriter writer = newIndexWriter(false, dir)) { - final String translogUuid = Translog.createEmptyTranslog(translogPath, initialGlobalCheckpoint, shardId); - final Map map = new HashMap<>(); - map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); - map.put(Translog.TRANSLOG_UUID_KEY, translogUuid); - updateCommitData(writer, map); - } - } - - - /** - * Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed. - */ - public static void ensureIndexHasHistoryUUID(final Directory dir) throws IOException { - try (IndexWriter writer = newIndexWriter(false, dir)) { - final Map userData = getUserData(writer); - if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) { - updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID())); - } - } - } - - private static void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { - final Map userData = getUserData(writer); - userData.putAll(keysToUpdate); - writer.setLiveCommitData(userData.entrySet()); - writer.commit(); - } - - private static Map getUserData(IndexWriter writer) { - final Map userData = new HashMap<>(); - writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); - return userData; - } - - private static IndexWriter newIndexWriter(final boolean create, final Directory dir) throws IOException { - IndexWriterConfig iwc = new IndexWriterConfig(null) - .setCommitOnClose(false) - // we don't want merges to happen here - we call maybe merge on the engine - // later once we stared it up otherwise we would need to wait for it here - // we also don't specify a codec here and merges should use the engines for this index - .setMergePolicy(NoMergePolicy.INSTANCE) - .setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND); - return new IndexWriter(dir, iwc); - } -} diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 224ae60a420..3654aeba2bf 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -40,13 +40,13 @@ import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; @@ -390,7 +390,11 @@ final class StoreRecovery { recoveryState.getIndex().updateVersion(version); if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { assert indexShouldExists; - EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), indexShard.shardPath().resolveTranslog(), shardId); + store.bootstrapNewHistory(); + final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId); + store.associateIndexWithNewTranslog(translogUUID); } else if (indexShouldExists) { // since we recover from local, just fill the files and size try { @@ -402,7 +406,10 @@ final class StoreRecovery { logger.debug("failed to list file details", e); } } else { - EngineDiskUtils.createEmpty(store.directory(), indexShard.shardPath().resolveTranslog(), shardId); + store.createEmpty(); + final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), + SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); } indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); @@ -445,8 +452,12 @@ final class StoreRecovery { } final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); - EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(indexShard.store().directory(), indexShard.shardPath().resolveTranslog(), - shardId); + final Store store = indexShard.store(); + store.bootstrapNewHistory(); + final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo(); + final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO)); + final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId); + store.associateIndexWithNewTranslog(translogUUID); assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index e560a0b040b..297790890c1 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -30,6 +30,8 @@ import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; @@ -46,7 +48,6 @@ import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.Version; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -69,11 +70,13 @@ import org.elasticsearch.common.util.SingleObjectCache; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.IndexShard; @@ -155,7 +158,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY); } - public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock, OnClose onClose) throws IOException { + public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock, + OnClose onClose) throws IOException { super(shardId, indexSettings); final Settings settings = indexSettings.getSettings(); this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId)); @@ -1454,4 +1458,100 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } } + /** + * creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted. + */ + public void createEmpty() throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory)) { + final Map map = new HashMap<>(); + map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); + map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED)); + map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1"); + updateCommitData(writer, map); + } finally { + metadataLock.writeLock().unlock(); + } + } + + + /** + * Marks an existing lucene index with a new history uuid. + * This is used to make sure no existing shard will recovery from this index using ops based recovery. + */ + public void bootstrapNewHistory() throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + final Map userData = getUserData(writer); + final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); + final Map map = new HashMap<>(); + map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()); + map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo)); + updateCommitData(writer, map); + } finally { + metadataLock.writeLock().unlock(); + } + } + + /** + * Force bakes the given translog generation as recovery information in the lucene index. This is + * used when recovering from a snapshot or peer file based recovery where a new empty translog is + * created and the existing lucene index needs should be changed to use it. + */ + public void associateIndexWithNewTranslog(final String translogUUID) throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) { + throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]"); + } + final Map map = new HashMap<>(); + map.put(Translog.TRANSLOG_GENERATION_KEY, "1"); + map.put(Translog.TRANSLOG_UUID_KEY, translogUUID); + updateCommitData(writer, map); + } finally { + metadataLock.writeLock().unlock(); + } + } + + + /** + * Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed. + */ + public void ensureIndexHasHistoryUUID() throws IOException { + metadataLock.writeLock().lock(); + try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) { + final Map userData = getUserData(writer); + if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) { + updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID())); + } + } finally { + metadataLock.writeLock().unlock(); + } + } + + private void updateCommitData(IndexWriter writer, Map keysToUpdate) throws IOException { + final Map userData = getUserData(writer); + userData.putAll(keysToUpdate); + writer.setLiveCommitData(userData.entrySet()); + writer.commit(); + } + + private Map getUserData(IndexWriter writer) { + final Map userData = new HashMap<>(); + writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue())); + return userData; + } + + private IndexWriter newIndexWriter(IndexWriterConfig.OpenMode openMode, final Directory dir) throws IOException { + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(openMode); + return new IndexWriter(dir, iwc); + } + } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index b28e992d9fd..91d3332f8e6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -40,7 +40,6 @@ import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -439,11 +438,12 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) { - EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory()); + store.ensureIndexHasHistoryUUID(); } // TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 - EngineDiskUtils.createNewTranslog(store.directory(), indexShard.shardPath().resolveTranslog(), - SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + final String translogUUID = + Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + store.associateIndexWithNewTranslog(translogUUID); } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java b/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java deleted file mode 100644 index aca94708af9..00000000000 --- a/server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.engine; - -import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.IndexWriterConfig; -import org.apache.lucene.index.NoMergePolicy; -import org.apache.lucene.search.IndexSearcher; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.codec.CodecService; -import org.elasticsearch.index.mapper.ParsedDocument; -import org.elasticsearch.index.seqno.SequenceNumbers; -import org.elasticsearch.index.store.Store; -import org.elasticsearch.index.translog.Translog; -import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; -import org.elasticsearch.test.IndexSettingsModule; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; - -import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; - -public class EngineDiskUtilsTests extends EngineTestCase { - - - public void testHistoryUUIDIsSetIfMissing() throws IOException { - final int numDocs = randomIntBetween(0, 3); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult index = engine.index(firstIndexRequest); - assertThat(index.getVersion(), equalTo(1L)); - } - assertVisibleCount(engine, numDocs); - engine.close(); - - IndexWriterConfig iwc = new IndexWriterConfig(null) - .setCommitOnClose(false) - // we don't want merges to happen here - we call maybe merge on the engine - // later once we stared it up otherwise we would need to wait for it here - // we also don't specify a codec here and merges should use the engines for this index - .setMergePolicy(NoMergePolicy.INSTANCE) - .setOpenMode(IndexWriterConfig.OpenMode.APPEND); - try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) { - Map newCommitData = new HashMap<>(); - for (Map.Entry entry : writer.getLiveCommitData()) { - if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) { - newCommitData.put(entry.getKey(), entry.getValue()); - } - } - writer.setLiveCommitData(newCommitData.entrySet()); - writer.commit(); - } - - EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory()); - - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() - .put(defaultSettings.getSettings()) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1) - .build()); - - EngineConfig config = engine.config(); - EngineConfig newConfig = new EngineConfig( - shardId, allocationId.getId(), - threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), - new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED); - engine = new InternalEngine(newConfig); - engine.recoverFromTranslog(); - assertVisibleCount(engine, numDocs, false); - assertThat(engine.getHistoryUUID(), notNullValue()); - } - - public void testCurrentTranslogIDisCommitted() throws IOException { - final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); - - // create - { - EngineDiskUtils.createEmpty(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId); - ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - - try (InternalEngine engine = createEngine(config)) { - engine.index(firstIndexRequest); - globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); - expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - // open and recover tlog - { - for (int i = 0; i < 2; i++) { - try (InternalEngine engine = new InternalEngine(config)) { - assertTrue(engine.isRecovering()); - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - if (i == 0) { - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } else { - // creating an empty index will create the first translog gen and commit it - // opening the empty index will make the second translog file but not commit it - // opening the engine again (i=0) will make the third translog file, which then be committed - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - } - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(); - userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - } - // open index with new tlog - { - EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(), - SequenceNumbers.NO_OPS_PERFORMED, shardId); - try (InternalEngine engine = new InternalEngine(config)) { - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(); - assertEquals(2, engine.getTranslog().currentFileGeneration()); - assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); - } - } - - // open and recover tlog with empty tlog - { - for (int i = 0; i < 2; i++) { - try (InternalEngine engine = new InternalEngine(config)) { - Map userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.recoverFromTranslog(); - userData = engine.getLastCommittedSegmentInfos().getUserData(); - assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); - assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - } - } - } - } - } - - public void testHistoryUUIDCanBeForced() throws IOException { - final int numDocs = randomIntBetween(0, 3); - for (int i = 0; i < numDocs; i++) { - ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null); - Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, - Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - Engine.IndexResult index = engine.index(firstIndexRequest); - assertThat(index.getVersion(), equalTo(1L)); - } - assertVisibleCount(engine, numDocs); - final String oldHistoryUUID = engine.getHistoryUUID(); - engine.close(); - EngineConfig config = engine.config(); - EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId); - - EngineConfig newConfig = new EngineConfig( - shardId, allocationId.getId(), - threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), - new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5), - config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), - new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED); - engine = new InternalEngine(newConfig); - engine.recoverFromTranslog(); - assertVisibleCount(engine, 0, false); - assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID))); - } -} diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index bc4ecbee4d6..c48a9ee8a2d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -65,7 +65,6 @@ import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.FixedBitSet; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; @@ -91,6 +90,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; @@ -1141,8 +1141,9 @@ public class InternalEngineTests extends EngineTestCase { engine.flushAndClose(); } if (randomBoolean()) { - EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(), + final String translogUUID = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + store.associateIndexWithNewTranslog(translogUUID); } engine = new InternalEngine(config); engine.recoverFromTranslog(); @@ -2354,6 +2355,84 @@ public class InternalEngineTests extends EngineTestCase { assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); } + public void testCurrentTranslogIDisCommitted() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + try (Store store = createStore()) { + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get); + + // create + { + store.createEmpty(); + final String translogUUID = + Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); + ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null); + Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0, + Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); + + try (InternalEngine engine = createEngine(config)) { + engine.index(firstIndexRequest); + globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint()); + expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog()); + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + // open and recover tlog + { + for (int i = 0; i < 2; i++) { + try (InternalEngine engine = new InternalEngine(config)) { + assertTrue(engine.isRecovering()); + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + if (i == 0) { + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + } else { + // creating an empty index will create the first translog gen and commit it + // opening the empty index will make the second translog file but not commit it + // opening the engine again (i=0) will make the third translog file, which then be committed + assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + } + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + } + // open index with new tlog + { + final String translogUUID = + Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); + try (InternalEngine engine = new InternalEngine(config)) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + assertEquals(2, engine.getTranslog().currentFileGeneration()); + assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); + } + } + + // open and recover tlog with empty tlog + { + for (int i = 0; i < 2; i++) { + try (InternalEngine engine = new InternalEngine(config)) { + Map userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + engine.recoverFromTranslog(); + userData = engine.getLastCommittedSegmentInfos().getUserData(); + assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); + assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); + } + } + } + } + } + public void testMissingTranslog() throws IOException { // test that we can force start the engine , even if the translog is missing. engine.close(); @@ -2369,7 +2448,8 @@ public class InternalEngineTests extends EngineTestCase { // expected } // when a new translog is created it should be ok - EngineDiskUtils.createNewTranslog(store.directory(), primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); + store.associateIndexWithNewTranslog(translogUUID); EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null); engine = new InternalEngine(config); } @@ -2432,7 +2512,9 @@ public class InternalEngineTests extends EngineTestCase { final Path translogPath = createTempDir(); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); - EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId); + store.createEmpty(); + final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId); + store.associateIndexWithNewTranslog(translogUUID); try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier)) { @@ -3223,7 +3305,8 @@ public class InternalEngineTests extends EngineTestCase { } try (Store store = createStore(newFSDirectory(storeDir))) { if (randomBoolean() || true) { - EngineDiskUtils.createNewTranslog(store.directory(), translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId); + final String translogUUID = Translog.createEmptyTranslog(translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); } try (Engine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); @@ -4025,10 +4108,12 @@ public class InternalEngineTests extends EngineTestCase { final Path translogPath = createTempDir(); store = createStore(); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + store.createEmpty(); + final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId); + store.associateIndexWithNewTranslog(translogUUID); final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, () -> globalCheckpoint.get()); - EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId); try (Engine engine = new InternalEngine(engineConfig) { @Override protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { @@ -4042,7 +4127,6 @@ public class InternalEngineTests extends EngineTestCase { }) { engine.recoverFromTranslog(); int numDocs = scaledRandomIntBetween(10, 100); - final String translogUUID = engine.getTranslog().getTranslogUUID(); for (int docId = 0; docId < numDocs; docId++) { ParseContext.Document document = testDocumentWithTextField(); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index f05fdc60c5c..822294a9c19 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -29,7 +29,6 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Constants; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -70,6 +69,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -1059,27 +1059,27 @@ public class IndexShardTests extends IndexShardTestCase { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); assertTrue(newShard.recoverFromStore()); snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); newShard.close("test", false); snapshot = newShard.snapshotStoreMetadata(); - assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); + assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); closeShards(newShard); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 1f9c5ae6df3..0609477dda8 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -26,7 +26,6 @@ import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; @@ -37,12 +36,12 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.EngineDiskUtils; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; @@ -52,6 +51,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.DummyShardLock; @@ -121,7 +121,10 @@ public class RefreshListenersTests extends ESTestCase { // we don't need to notify anybody in this test } }; - EngineDiskUtils.createEmpty(store.directory(), translogConfig.getTranslogPath(), shardId); + store.createEmpty(); + final String translogUUID = + Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUUID); EngineConfig config = new EngineConfig(shardId, allocationId, threadPool, indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 392227396de..9352d978e6e 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -48,7 +48,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.Version; import org.elasticsearch.ExceptionsHelper; @@ -59,6 +58,7 @@ import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.ShardLock; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -93,7 +93,9 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; public class StoreTests extends ESTestCase { @@ -761,7 +763,8 @@ public class StoreTests extends ESTestCase { Settings settings = Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)).build(); - Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService, new DummyShardLock(shardId)); + Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService, + new DummyShardLock(shardId)); long initialStoreSize = 0; for (String extraFiles : store.directory().listAll()) { assertTrue("expected extraFS file but got: " + extraFiles, extraFiles.startsWith("extra")); @@ -1071,4 +1074,55 @@ public class StoreTests extends ESTestCase { store.close(); } + public void testEnsureIndexHasHistoryUUID() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + DirectoryService directoryService = new LuceneManagedDirectoryService(random()); + try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) { + + store.createEmpty(); + + // remove the history uuid + IndexWriterConfig iwc = new IndexWriterConfig(null) + .setCommitOnClose(false) + // we don't want merges to happen here - we call maybe merge on the engine + // later once we stared it up otherwise we would need to wait for it here + // we also don't specify a codec here and merges should use the engines for this index + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND); + try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) { + Map newCommitData = new HashMap<>(); + for (Map.Entry entry : writer.getLiveCommitData()) { + if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) { + newCommitData.put(entry.getKey(), entry.getValue()); + } + } + writer.setLiveCommitData(newCommitData.entrySet()); + writer.commit(); + } + + store.ensureIndexHasHistoryUUID(); + + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY)); + } + } + + public void testHistoryUUIDCanBeForced() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + DirectoryService directoryService = new LuceneManagedDirectoryService(random()); + try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) { + + store.createEmpty(); + + SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY)); + final String oldHistoryUUID = segmentInfos.getUserData().get(Engine.HISTORY_UUID_KEY); + + store.bootstrapNewHistory(); + + segmentInfos = Lucene.readSegmentInfos(store.directory()); + assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY)); + assertThat(segmentInfos.getUserData().get(Engine.HISTORY_UUID_KEY), not(equalTo(oldHistoryUUID))); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index d1dbaf6bc89..661a1f06354 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -32,7 +32,6 @@ import org.apache.lucene.index.Term; import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -48,6 +47,7 @@ import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.SegmentsStats; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index c75e469f7af..f0e46cf0223 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -37,8 +37,6 @@ import org.apache.lucene.search.Sort; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.Directory; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -51,6 +49,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; @@ -363,9 +362,14 @@ public abstract class EngineTestCase extends ESTestCase { @Nullable BiFunction localCheckpointTrackerSupplier, @Nullable ToLongBiFunction seqNoForOperation, EngineConfig config) throws IOException { - final Directory directory = config.getStore().directory(); + final Store store = config.getStore(); + final Directory directory = store.directory(); if (Lucene.indexExists(directory) == false) { - EngineDiskUtils.createEmpty(directory, config.getTranslogConfig().getTranslogPath(), config.getShardId()); + store.createEmpty(); + final String translogUuid = + Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId); + store.associateIndexWithNewTranslog(translogUuid); + } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); internalEngine.recoverFromTranslog(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index b5ea5fd4c0e..6d6cc36d78b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -25,7 +25,6 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.store.Directory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -46,6 +45,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings;