Fold EngineDiskUtils into Store, for better lock semantics (#29156)

#28245 has introduced the utility class`EngineDiskUtils` with a set of methods to prepare/change
translog and lucene commit points. That util class bundled everything that's needed to create and
empty shard, bootstrap a shard from a lucene index that was just restored etc. 

In order to safely do these manipulations, the util methods acquired the IndexWriter's lock. That
would sometime fail due to concurrent shard store fetching or other short activities that require the
files not to be changed while they read from them. 

Since there is no way to wait on the index writer lock, the `Store` class has other locks to make
sure that once we try to acquire the IW lock, it will succeed. To side step this waiting problem, this
PR folds `EngineDiskUtils` into `Store`. Sadly this comes with a price - the store class doesn't and
shouldn't know about the translog. As such the logic is slightly less tight and callers have to do the
translog manipulations on their own.
This commit is contained in:
Boaz Leskes 2018-03-26 14:08:03 +02:00 committed by GitHub
parent a9392f6d42
commit f5d4550e93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 293 additions and 388 deletions

View File

@ -93,7 +93,7 @@ which returns something similar to:
{ {
"commit" : { "commit" : {
"id" : "3M3zkw2GHMo2Y4h4/KFKCg==", "id" : "3M3zkw2GHMo2Y4h4/KFKCg==",
"generation" : 3, "generation" : 4,
"user_data" : { "user_data" : {
"translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA", "translog_uuid" : "hnOG3xFcTDeoI_kvvvOdNA",
"history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ", "history_uuid" : "XP7KDJGiS1a2fHYiFL5TXQ",

View File

@ -24,7 +24,6 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Sort; import org.apache.lucene.search.Sort;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable; import org.apache.lucene.util.Accountable;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -40,6 +39,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.env.ShardLockObtainFailedException;

View File

@ -1,144 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Assertions;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class contains utility methods for mutating the shard lucene index and translog as a preparation to be opened.
*/
public abstract class EngineDiskUtils {
/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
public static void createEmpty(final Directory dir, final Path translogPath, final ShardId shardId) throws IOException {
try (IndexWriter writer = newIndexWriter(true, dir)) {
final String translogUuid = Translog.createEmptyTranslog(translogPath, SequenceNumbers.NO_OPS_PERFORMED, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1");
updateCommitData(writer, map);
}
}
/**
* Converts an existing lucene index and marks it with a new history uuid. Also creates a new empty translog file.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
*/
public static void bootstrapNewHistoryFromLuceneIndex(final Directory dir, final Path translogPath, final ShardId shardId)
throws IOException {
try (IndexWriter writer = newIndexWriter(false, dir)) {
final Map<String, String> userData = getUserData(writer);
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUuid = Translog.createEmptyTranslog(translogPath, maxSeqNo, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
updateCommitData(writer, map);
}
}
/**
* Creates a new empty translog and associates it with an existing lucene index.
*/
public static void createNewTranslog(final Directory dir, final Path translogPath, long initialGlobalCheckpoint, final ShardId shardId)
throws IOException {
if (Assertions.ENABLED) {
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(dir);
assert existingCommits.size() == 1 : "creating a translog translog should have one commit, commits[" + existingCommits + "]";
SequenceNumbers.CommitInfo commitInfo = Store.loadSeqNoInfo(existingCommits.get(0));
assert commitInfo.localCheckpoint >= initialGlobalCheckpoint :
"trying to create a shard whose local checkpoint [" + commitInfo.localCheckpoint + "] is < global checkpoint ["
+ initialGlobalCheckpoint + "]";
}
try (IndexWriter writer = newIndexWriter(false, dir)) {
final String translogUuid = Translog.createEmptyTranslog(translogPath, initialGlobalCheckpoint, shardId);
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUuid);
updateCommitData(writer, map);
}
}
/**
* Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed.
*/
public static void ensureIndexHasHistoryUUID(final Directory dir) throws IOException {
try (IndexWriter writer = newIndexWriter(false, dir)) {
final Map<String, String> userData = getUserData(writer);
if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) {
updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()));
}
}
}
private static void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
final Map<String, String> userData = getUserData(writer);
userData.putAll(keysToUpdate);
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
private static Map<String, String> getUserData(IndexWriter writer) {
final Map<String, String> userData = new HashMap<>();
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
return userData;
}
private static IndexWriter newIndexWriter(final boolean create, final Directory dir) throws IOException {
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(create ? IndexWriterConfig.OpenMode.CREATE : IndexWriterConfig.OpenMode.APPEND);
return new IndexWriter(dir, iwc);
}
}

View File

@ -40,13 +40,13 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.EngineDiskUtils;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException; import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
@ -390,7 +390,11 @@ final class StoreRecovery {
recoveryState.getIndex().updateVersion(version); recoveryState.getIndex().updateVersion(version);
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists; assert indexShouldExists;
EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), indexShard.shardPath().resolveTranslog(), shardId); store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
store.associateIndexWithNewTranslog(translogUUID);
} else if (indexShouldExists) { } else if (indexShouldExists) {
// since we recover from local, just fill the files and size // since we recover from local, just fill the files and size
try { try {
@ -402,7 +406,10 @@ final class StoreRecovery {
logger.debug("failed to list file details", e); logger.debug("failed to list file details", e);
} }
} else { } else {
EngineDiskUtils.createEmpty(store.directory(), indexShard.shardPath().resolveTranslog(), shardId); store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED, shardId);
store.associateIndexWithNewTranslog(translogUUID);
} }
indexShard.openEngineAndRecoverFromTranslog(); indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());
@ -445,8 +452,12 @@ final class StoreRecovery {
} }
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(indexShard.store().directory(), indexShard.shardPath().resolveTranslog(), final Store store = indexShard.store();
shardId); store.bootstrapNewHistory();
final SegmentInfos segmentInfos = store.readLastCommittedSegmentsInfo();
final long maxSeqNo = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.MAX_SEQ_NO));
final String translogUUID = Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), maxSeqNo, shardId);
store.associateIndexWithNewTranslog(translogUUID);
assert indexShard.shardRouting.primary() : "only primary shards can recover from store"; assert indexShard.shardRouting.primary() : "only primary shards can recover from store";
indexShard.openEngineAndRecoverFromTranslog(); indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm()); indexShard.getEngine().fillSeqNoGaps(indexShard.getPrimaryTerm());

View File

@ -30,6 +30,8 @@ import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexNotFoundException; import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
@ -46,7 +48,6 @@ import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.Version; import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
@ -69,11 +70,13 @@ import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException; import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
@ -155,7 +158,8 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY); this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY);
} }
public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock, OnClose onClose) throws IOException { public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock,
OnClose onClose) throws IOException {
super(shardId, indexSettings); super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings(); final Settings settings = indexSettings.getSettings();
this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId)); this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId));
@ -1454,4 +1458,100 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
} }
} }
/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
public void createEmpty() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory)) {
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
map.put(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, "-1");
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();
}
}
/**
* Marks an existing lucene index with a new history uuid.
* This is used to make sure no existing shard will recovery from this index using ops based recovery.
*/
public void bootstrapNewHistory() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
final Map<String, String> userData = getUserData(writer);
final long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO));
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(maxSeqNo));
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();
}
}
/**
* Force bakes the given translog generation as recovery information in the lucene index. This is
* used when recovering from a snapshot or peer file based recovery where a new empty translog is
* created and the existing lucene index needs should be changed to use it.
*/
public void associateIndexWithNewTranslog(final String translogUUID) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
updateCommitData(writer, map);
} finally {
metadataLock.writeLock().unlock();
}
}
/**
* Checks that the Lucene index contains a history uuid marker. If not, a new one is generated and committed.
*/
public void ensureIndexHasHistoryUUID() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory)) {
final Map<String, String> userData = getUserData(writer);
if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) {
updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()));
}
} finally {
metadataLock.writeLock().unlock();
}
}
private void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
final Map<String, String> userData = getUserData(writer);
userData.putAll(keysToUpdate);
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
private Map<String, String> getUserData(IndexWriter writer) {
final Map<String, String> userData = new HashMap<>();
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
return userData;
}
private IndexWriter newIndexWriter(IndexWriterConfig.OpenMode openMode, final Directory dir) throws IOException {
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(openMode);
return new IndexWriter(dir, iwc);
}
} }

View File

@ -40,7 +40,6 @@ import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineDiskUtils;
import org.elasticsearch.index.mapper.MapperException; import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
@ -439,11 +438,12 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
try { try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) { if (indexShard.indexSettings().getIndexVersionCreated().before(Version.V_6_0_0_rc1)) {
EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory()); store.ensureIndexHasHistoryUUID();
} }
// TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2 // TODO: Assign the global checkpoint to the max_seqno of the safe commit if the index version >= 6.2
EngineDiskUtils.createNewTranslog(store.directory(), indexShard.shardPath().resolveTranslog(), final String translogUUID =
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); Translog.createEmptyTranslog(indexShard.shardPath().resolveTranslog(), SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
store.associateIndexWithNewTranslog(translogUUID);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage. // this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are // this means we transferred files from the remote that have not be checksummed and they are

View File

@ -1,207 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
public class EngineDiskUtilsTests extends EngineTestCase {
public void testHistoryUUIDIsSetIfMissing() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
engine.close();
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) {
Map<String, String> newCommitData = new HashMap<>();
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) {
newCommitData.put(entry.getKey(), entry.getValue());
}
}
writer.setLiveCommitData(newCommitData.entrySet());
writer.commit();
}
EngineDiskUtils.ensureIndexHasHistoryUUID(store.directory());
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_6_0_0_beta1)
.build());
EngineConfig config = engine.config();
EngineConfig newConfig = new EngineConfig(
shardId, allocationId.getId(),
threadPool, indexSettings, null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED);
engine = new InternalEngine(newConfig);
engine.recoverFromTranslog();
assertVisibleCount(engine, numDocs, false);
assertThat(engine.getHistoryUUID(), notNullValue());
}
public void testCurrentTranslogIDisCommitted() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
// create
{
EngineDiskUtils.createEmpty(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId);
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
try (InternalEngine engine = createEngine(config)) {
engine.index(firstIndexRequest);
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
// open and recover tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
} else {
// creating an empty index will create the first translog gen and commit it
// opening the empty index will make the second translog file but not commit it
// opening the engine again (i=0) will make the third translog file, which then be committed
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
// open index with new tlog
{
EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED, shardId);
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
}
}
// open and recover tlog with empty tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
}
}
public void testHistoryUUIDCanBeForced() throws IOException {
final int numDocs = randomIntBetween(0, 3);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
}
assertVisibleCount(engine, numDocs);
final String oldHistoryUUID = engine.getHistoryUUID();
engine.close();
EngineConfig config = engine.config();
EngineDiskUtils.bootstrapNewHistoryFromLuceneIndex(store.directory(), config.getTranslogConfig().getTranslogPath(), shardId);
EngineConfig newConfig = new EngineConfig(
shardId, allocationId.getId(),
threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), config.getTranslogConfig(), TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED);
engine = new InternalEngine(newConfig);
engine.recoverFromTranslog();
assertVisibleCount(engine, 0, false);
assertThat(engine.getHistoryUUID(), not(equalTo(oldHistoryUUID)));
}
}

View File

@ -65,7 +65,6 @@ import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -91,6 +90,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
@ -1141,8 +1141,9 @@ public class InternalEngineTests extends EngineTestCase {
engine.flushAndClose(); engine.flushAndClose();
} }
if (randomBoolean()) { if (randomBoolean()) {
EngineDiskUtils.createNewTranslog(store.directory(), config.getTranslogConfig().getTranslogPath(), final String translogUUID = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
store.associateIndexWithNewTranslog(translogUUID);
} }
engine = new InternalEngine(config); engine = new InternalEngine(config);
engine.recoverFromTranslog(); engine.recoverFromTranslog();
@ -2354,6 +2355,84 @@ public class InternalEngineTests extends EngineTestCase {
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName()); assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
} }
public void testCurrentTranslogIDisCommitted() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
// create
{
store.createEmpty();
final String translogUUID =
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
store.associateIndexWithNewTranslog(translogUUID);
ParsedDocument doc = testParsedDocument(Integer.toString(0), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, SequenceNumbers.UNASSIGNED_SEQ_NO, 0,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
try (InternalEngine engine = createEngine(config)) {
engine.index(firstIndexRequest);
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
// open and recover tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
assertTrue(engine.isRecovering());
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
if (i == 0) {
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
} else {
// creating an empty index will create the first translog gen and commit it
// opening the empty index will make the second translog file but not commit it
// opening the engine again (i=0) will make the third translog file, which then be committed
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
}
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
// open index with new tlog
{
final String translogUUID =
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
store.associateIndexWithNewTranslog(translogUUID);
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
assertEquals(2, engine.getTranslog().currentFileGeneration());
assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
}
}
// open and recover tlog with empty tlog
{
for (int i = 0; i < 2; i++) {
try (InternalEngine engine = new InternalEngine(config)) {
Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
engine.recoverFromTranslog();
userData = engine.getLastCommittedSegmentInfos().getUserData();
assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
}
}
}
}
}
public void testMissingTranslog() throws IOException { public void testMissingTranslog() throws IOException {
// test that we can force start the engine , even if the translog is missing. // test that we can force start the engine , even if the translog is missing.
engine.close(); engine.close();
@ -2369,7 +2448,8 @@ public class InternalEngineTests extends EngineTestCase {
// expected // expected
} }
// when a new translog is created it should be ok // when a new translog is created it should be ok
EngineDiskUtils.createNewTranslog(store.directory(), primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId); final String translogUUID = Translog.createEmptyTranslog(primaryTranslogDir, SequenceNumbers.UNASSIGNED_SEQ_NO, shardId);
store.associateIndexWithNewTranslog(translogUUID);
EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null); EngineConfig config = config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null);
engine = new InternalEngine(config); engine = new InternalEngine(config);
} }
@ -2432,7 +2512,9 @@ public class InternalEngineTests extends EngineTestCase {
final Path translogPath = createTempDir(); final Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get(); final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get();
EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId); store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId);
store.associateIndexWithNewTranslog(translogUUID);
try (InternalEngine engine = try (InternalEngine engine =
new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
globalCheckpointSupplier)) { globalCheckpointSupplier)) {
@ -3223,7 +3305,8 @@ public class InternalEngineTests extends EngineTestCase {
} }
try (Store store = createStore(newFSDirectory(storeDir))) { try (Store store = createStore(newFSDirectory(storeDir))) {
if (randomBoolean() || true) { if (randomBoolean() || true) {
EngineDiskUtils.createNewTranslog(store.directory(), translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId); final String translogUUID = Translog.createEmptyTranslog(translogDir, SequenceNumbers.NO_OPS_PERFORMED, shardId);
store.associateIndexWithNewTranslog(translogUUID);
} }
try (Engine engine = new InternalEngine(configSupplier.apply(store))) { try (Engine engine = new InternalEngine(configSupplier.apply(store))) {
assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
@ -4025,10 +4108,12 @@ public class InternalEngineTests extends EngineTestCase {
final Path translogPath = createTempDir(); final Path translogPath = createTempDir();
store = createStore(); store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId);
store.associateIndexWithNewTranslog(translogUUID);
final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, final EngineConfig engineConfig = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null,
() -> globalCheckpoint.get()); () -> globalCheckpoint.get());
EngineDiskUtils.createEmpty(store.directory(), translogPath, shardId);
try (Engine engine = new InternalEngine(engineConfig) { try (Engine engine = new InternalEngine(engineConfig) {
@Override @Override
protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
@ -4042,7 +4127,6 @@ public class InternalEngineTests extends EngineTestCase {
}) { }) {
engine.recoverFromTranslog(); engine.recoverFromTranslog();
int numDocs = scaledRandomIntBetween(10, 100); int numDocs = scaledRandomIntBetween(10, 100);
final String translogUUID = engine.getTranslog().getTranslogUUID();
for (int docId = 0; docId < numDocs; docId++) { for (int docId = 0; docId < numDocs; docId++) {
ParseContext.Document document = testDocumentWithTextField(); ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE)); document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));

View File

@ -29,7 +29,6 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@ -70,6 +69,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
@ -1059,27 +1059,27 @@ public class IndexShardTests extends IndexShardTestCase {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata(); Store.MetadataSnapshot snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3"));
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null)); newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
snapshot = newShard.snapshotStoreMetadata(); snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3"));
assertTrue(newShard.recoverFromStore()); assertTrue(newShard.recoverFromStore());
snapshot = newShard.snapshotStoreMetadata(); snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3"));
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
snapshot = newShard.snapshotStoreMetadata(); snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3"));
newShard.close("test", false); newShard.close("test", false);
snapshot = newShard.snapshotStoreMetadata(); snapshot = newShard.snapshotStoreMetadata();
assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_2")); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3"));
closeShards(newShard); closeShards(newShard);
} }

View File

@ -26,7 +26,6 @@ import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term; import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
@ -37,12 +36,12 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineDiskUtils;
import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor; import org.elasticsearch.index.fieldvisitor.SingleFieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.IdFieldMapper;
@ -52,6 +51,7 @@ import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.DummyShardLock;
@ -121,7 +121,10 @@ public class RefreshListenersTests extends ESTestCase {
// we don't need to notify anybody in this test // we don't need to notify anybody in this test
} }
}; };
EngineDiskUtils.createEmpty(store.directory(), translogConfig.getTranslogPath(), shardId); store.createEmpty();
final String translogUUID =
Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
store.associateIndexWithNewTranslog(translogUUID);
EngineConfig config = new EngineConfig(shardId, allocationId, threadPool, EngineConfig config = new EngineConfig(shardId, allocationId, threadPool,
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,

View File

@ -48,7 +48,6 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version; import org.apache.lucene.util.Version;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
@ -59,6 +58,7 @@ import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.ShardLock; import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
@ -93,7 +93,9 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
public class StoreTests extends ESTestCase { public class StoreTests extends ESTestCase {
@ -761,7 +763,8 @@ public class StoreTests extends ESTestCase {
Settings settings = Settings.builder() Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) .put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)).build(); .put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)).build();
Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService, new DummyShardLock(shardId)); Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService,
new DummyShardLock(shardId));
long initialStoreSize = 0; long initialStoreSize = 0;
for (String extraFiles : store.directory().listAll()) { for (String extraFiles : store.directory().listAll()) {
assertTrue("expected extraFS file but got: " + extraFiles, extraFiles.startsWith("extra")); assertTrue("expected extraFS file but got: " + extraFiles, extraFiles.startsWith("extra"));
@ -1071,4 +1074,55 @@ public class StoreTests extends ESTestCase {
store.close(); store.close();
} }
public void testEnsureIndexHasHistoryUUID() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) {
store.createEmpty();
// remove the history uuid
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
try (IndexWriter writer = new IndexWriter(store.directory(), iwc)) {
Map<String, String> newCommitData = new HashMap<>();
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
if (entry.getKey().equals(Engine.HISTORY_UUID_KEY) == false) {
newCommitData.put(entry.getKey(), entry.getValue());
}
}
writer.setLiveCommitData(newCommitData.entrySet());
writer.commit();
}
store.ensureIndexHasHistoryUUID();
SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY));
}
}
public void testHistoryUUIDCanBeForced() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) {
store.createEmpty();
SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY));
final String oldHistoryUUID = segmentInfos.getUserData().get(Engine.HISTORY_UUID_KEY);
store.bootstrapNewHistory();
segmentInfos = Lucene.readSegmentInfos(store.directory());
assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY));
assertThat(segmentInfos.getUserData().get(Engine.HISTORY_UUID_KEY), not(equalTo(oldHistoryUUID)));
}
}
} }

View File

@ -32,7 +32,6 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.BaseDirectoryWrapper;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
@ -48,6 +47,7 @@ import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.SegmentsStats; import org.elasticsearch.index.engine.SegmentsStats;

View File

@ -37,8 +37,6 @@ import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -51,6 +49,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType; import org.elasticsearch.index.VersionType;
@ -363,9 +362,14 @@ public abstract class EngineTestCase extends ESTestCase {
@Nullable BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier, @Nullable BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier,
@Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation, @Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
EngineConfig config) throws IOException { EngineConfig config) throws IOException {
final Directory directory = config.getStore().directory(); final Store store = config.getStore();
final Directory directory = store.directory();
if (Lucene.indexExists(directory) == false) { if (Lucene.indexExists(directory) == false) {
EngineDiskUtils.createEmpty(directory, config.getTranslogConfig().getTranslogPath(), config.getShardId()); store.createEmpty();
final String translogUuid =
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId);
store.associateIndexWithNewTranslog(translogUuid);
} }
InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
internalEngine.recoverFromTranslog(); internalEngine.recoverFromTranslog();

View File

@ -25,7 +25,6 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@ -46,6 +45,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;