Set Lucene version upon index creation. (#36038)

It is important that all shards of a given index have the same
`indexCreatedVersionMajor` to Lucene, or eg. merging those shards is going to
be considered illegal. At the moment, we use the latest Lucene version when
creating a shard, which could cause shards to have different created versions
eg. in case of forced allocation. This commit makes sure to reuse the
appropriate Lucene version in order to avoid such issues.

Closes #33826
This commit is contained in:
Adrien Grand 2018-12-04 17:53:20 +01:00 committed by GitHub
parent b59deb573e
commit 0df08dd458
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 114 additions and 42 deletions

View File

@ -36,6 +36,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
public class Version implements Comparable<Version>, ToXContentFragment {
/*
@ -192,7 +193,30 @@ public class Version implements Comparable<Version>, ToXContentFragment {
case V_EMPTY_ID:
return V_EMPTY;
default:
return new Version(id, org.apache.lucene.util.Version.LATEST);
// We need at least the major of the Lucene version to be correct.
// Our best guess is to use the same Lucene version as the previous
// version in the list, assuming that it didn't change. This is at
// least correct for patch versions of known minors since we never
// update the Lucene dependency for patch versions.
List<Version> versions = DeclaredVersionsHolder.DECLARED_VERSIONS;
Version tmp = new Version(id, org.apache.lucene.util.Version.LATEST);
int index = Collections.binarySearch(versions, tmp);
if (index < 0) {
index = -2 - index;
} else {
assert false : "Version [" + tmp + "] is declared but absent from the switch statement in Version#fromId";
}
final org.apache.lucene.util.Version luceneVersion;
if (index == -1) {
// this version is older than any supported version, so we
// assume it is the previous major to the oldest Lucene version
// that we know about
luceneVersion = org.apache.lucene.util.Version.fromBits(
versions.get(0).luceneVersion.major - 1, 0, 0);
} else {
luceneVersion = versions.get(index).luceneVersion;
}
return new Version(id, luceneVersion);
}
}
@ -300,7 +324,7 @@ public class Version implements Comparable<Version>, ToXContentFragment {
this.minor = (byte) ((id / 10000) % 100);
this.revision = (byte) ((id / 100) % 100);
this.build = (byte) (id % 100);
this.luceneVersion = luceneVersion;
this.luceneVersion = Objects.requireNonNull(luceneVersion);
}
public boolean after(Version version) {

View File

@ -149,11 +149,8 @@ final class StoreRecovery {
final long maxSeqNo, final long maxUnsafeAutoIdTimestamp, IndexMetaData indexMetaData, int shardId, boolean split,
boolean hasNested) throws IOException {
// clean target directory (if previous recovery attempt failed) and create a fresh segment file with the proper lucene version
Lucene.cleanLuceneIndex(target);
assert sources.length > 0;
final int luceneIndexCreatedVersionMajor = Lucene.readSegmentInfos(sources[0]).getIndexCreatedVersionMajor();
new SegmentInfos(luceneIndexCreatedVersionMajor).commit(target);
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
@ -164,7 +161,8 @@ final class StoreRecovery {
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setIndexCreatedVersionMajor(luceneIndexCreatedVersionMajor);
if (indexSort != null) {
iwc.setIndexSort(indexSort);
}
@ -417,7 +415,7 @@ final class StoreRecovery {
logger.debug("failed to list file details", e);
}
} else {
store.createEmpty();
store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion);
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(), SequenceNumbers.NO_OPS_PERFORMED, shardId,
indexShard.getPendingPrimaryTerm());

View File

@ -1404,9 +1404,9 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
/**
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
*/
public void createEmpty() throws IOException {
public void createEmpty(Version luceneVersion) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.CREATE, directory, null)) {
try (IndexWriter writer = newEmptyIndexWriter(directory, luceneVersion)) {
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(SequenceNumbers.NO_OPS_PERFORMED));
@ -1443,7 +1443,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public void bootstrapNewHistory(long maxSeqNo) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {
final Map<String, String> map = new HashMap<>();
map.put(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID());
map.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
@ -1461,7 +1461,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public void associateIndexWithNewTranslog(final String translogUUID) throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
@ -1480,7 +1480,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public void ensureIndexHasHistoryUUID() throws IOException {
metadataLock.writeLock().lock();
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, null)) {
try (IndexWriter writer = newAppendingIndexWriter(directory, null)) {
final Map<String, String> userData = getUserData(writer);
if (userData.containsKey(Engine.HISTORY_UUID_KEY) == false) {
updateCommitData(writer, Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()));
@ -1546,7 +1546,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
+ translogUUID + "]");
}
if (startingIndexCommit.equals(existingCommits.get(existingCommits.size() - 1)) == false) {
try (IndexWriter writer = newIndexWriter(IndexWriterConfig.OpenMode.APPEND, directory, startingIndexCommit)) {
try (IndexWriter writer = newAppendingIndexWriter(directory, startingIndexCommit)) {
// this achieves two things:
// - by committing a new commit based on the starting commit, it make sure the starting commit will be opened
// - deletes any other commit (by lucene standard deletion policy)
@ -1578,19 +1578,28 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return userData;
}
private static IndexWriter newIndexWriter(final IndexWriterConfig.OpenMode openMode, final Directory dir, final IndexCommit commit)
throws IOException {
assert openMode == IndexWriterConfig.OpenMode.APPEND || commit == null : "can't specify create flag with a commit";
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setCommitOnClose(false)
private static IndexWriter newAppendingIndexWriter(final Directory dir, final IndexCommit commit) throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig()
.setIndexCommit(commit)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(openMode);
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
return new IndexWriter(dir, iwc);
}
private static IndexWriter newEmptyIndexWriter(final Directory dir, final Version luceneVersion) throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig()
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setIndexCreatedVersionMajor(luceneVersion.major);
return new IndexWriter(dir, iwc);
}
private static IndexWriterConfig newIndexWriterConfig() {
return new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setCommitOnClose(false)
// we don't want merges to happen here - we call maybe merge on the engine
// later once we stared it up otherwise we would need to wait for it here
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE);
}
}

View File

@ -27,8 +27,6 @@ import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
@ -1495,11 +1493,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// version number and no checksum, even though the index itself is perfectly fine to restore, this
// empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty
// shard anyway, we just create the empty shard here and then exit.
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
.setCommitOnClose(true));
writer.close();
store.createEmpty(targetShard.indexSettings().getIndexMetaData().getCreationVersion().luceneVersion);
return;
}

View File

@ -26,16 +26,19 @@ import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Version;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.hamcrest.MatcherAssert;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.loadDocIdAndVersion;
@ -193,4 +196,25 @@ public class VersionsTests extends ESTestCase {
assertEquals(size, VersionsAndSeqNoResolver.lookupStates.size());
dir.close();
}
public void testLuceneVersionOnUnknownVersions() {
List<Version> allVersions = VersionUtils.allVersions();
// should have the same Lucene version as the latest 6.x version
Version version = Version.fromString("6.88.50");
assertEquals(allVersions.get(Collections.binarySearch(allVersions, Version.V_7_0_0) - 1).luceneVersion,
version.luceneVersion);
// between two known versions, should use the lucene version of the previous version
version = Version.fromString("6.2.50");
assertEquals(VersionUtils.getPreviousVersion(Version.V_6_2_4).luceneVersion, version.luceneVersion);
// too old version, major should be the oldest supported lucene version minus 1
version = Version.fromString("5.2.1");
assertEquals(Version.V_6_0_0.luceneVersion.major - 1, version.luceneVersion.major);
// future version, should be the same version as today
version = Version.fromString("7.77.1");
assertEquals(Version.CURRENT.luceneVersion, version.luceneVersion);
}
}

View File

@ -67,7 +67,6 @@ import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
@ -127,6 +126,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.VersionUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
@ -2605,7 +2605,7 @@ public class InternalEngineTests extends EngineTestCase {
// create
{
store.createEmpty();
store.createEmpty(Version.CURRENT.luceneVersion);
final String translogUUID =
Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
@ -2769,7 +2769,7 @@ public class InternalEngineTests extends EngineTestCase {
final Path translogPath = createTempDir();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final LongSupplier globalCheckpointSupplier = () -> globalCheckpoint.get();
store.createEmpty();
store.createEmpty(Version.CURRENT.luceneVersion);
final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
try (InternalEngine engine =
@ -4585,7 +4585,7 @@ public class InternalEngineTests extends EngineTestCase {
final Path translogPath = createTempDir();
store = createStore();
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
store.createEmpty();
store.createEmpty(Version.CURRENT.luceneVersion);
final String translogUUID = Translog.createEmptyTranslog(translogPath, globalCheckpoint.get(), shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUUID);
@ -5500,4 +5500,25 @@ public class InternalEngineTests extends EngineTestCase {
assertThat(message, engine.getNumDocUpdates(), equalTo(expectedUpdates));
assertThat(message, engine.getNumDocDeletes(), equalTo(expectedDeletes));
}
public void testStoreHonorsLuceneVersion() throws IOException {
for (Version createdVersion : Arrays.asList(
Version.CURRENT, VersionUtils.getPreviousMinorVersion(), VersionUtils.getFirstVersion())) {
Settings settings = Settings.builder()
.put(indexSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, createdVersion).build();
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", settings);
try (Store store = createStore();
InternalEngine engine = createEngine(config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null))) {
ParsedDocument doc = testParsedDocument("1", null, new Document(),
new BytesArray("{}".getBytes("UTF-8")), null);
engine.index(appendOnlyPrimary(doc, false, 1));
engine.refresh("test");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
LeafReader leafReader = getOnlyLeafReader(searcher.reader());
assertEquals(createdVersion.luceneVersion.major, leafReader.getMetaData().getCreatedVersionMajor());
}
}
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.engine;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.mapper.ParsedDocument;
@ -143,7 +144,7 @@ public class ReadOnlyEngineTests extends EngineTestCase {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
store.createEmpty();
store.createEmpty(Version.CURRENT.luceneVersion);
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) {
Class<? extends Throwable> expectedException = LuceneTestCase.TEST_ASSERTS_ENABLED ? AssertionError.class :
UnsupportedOperationException.class;

View File

@ -26,6 +26,7 @@ import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
@ -114,7 +115,7 @@ public class RefreshListenersTests extends ESTestCase {
// we don't need to notify anybody in this test
}
};
store.createEmpty();
store.createEmpty(Version.CURRENT.luceneVersion);
final long primaryTerm = randomNonNegativeLong();
final String translogUUID =
Translog.createEmptyTranslog(translogConfig.getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm);

View File

@ -1035,7 +1035,7 @@ public class StoreTests extends ESTestCase {
final ShardId shardId = new ShardId("index", "_na_", 1);
try (Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId))) {
store.createEmpty();
store.createEmpty(Version.LATEST);
// remove the history uuid
IndexWriterConfig iwc = new IndexWriterConfig(null)
@ -1067,7 +1067,7 @@ public class StoreTests extends ESTestCase {
final ShardId shardId = new ShardId("index", "_na_", 1);
try (Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId))) {
store.createEmpty();
store.createEmpty(Version.LATEST);
SegmentInfos segmentInfos = Lucene.readSegmentInfos(store.directory());
assertThat(segmentInfos.getUserData(), hasKey(Engine.HISTORY_UUID_KEY));

View File

@ -500,7 +500,7 @@ public abstract class EngineTestCase extends ESTestCase {
final Store store = config.getStore();
final Directory directory = store.directory();
if (Lucene.indexExists(directory) == false) {
store.createEmpty();
store.createEmpty(config.getIndexSettings().getIndexVersionCreated().luceneVersion);
final String translogUuid = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
store.associateIndexWithNewTranslog(translogUuid);

View File

@ -280,7 +280,7 @@ public class FollowingEngineTests extends ESTestCase {
}
private FollowingEngine createEngine(Store store, EngineConfig config) throws IOException {
store.createEmpty();
store.createEmpty(config.getIndexSettings().getIndexVersionCreated().luceneVersion);
final String translogUuid = Translog.createEmptyTranslog(config.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L);
store.associateIndexWithNewTranslog(translogUuid);
@ -485,7 +485,7 @@ public class FollowingEngineTests extends ESTestCase {
IndexMetaData leaderIndexMetaData = IndexMetaData.builder(index.getName()).settings(leaderSettings).build();
IndexSettings leaderIndexSettings = new IndexSettings(leaderIndexMetaData, leaderSettings);
try (Store leaderStore = createStore(shardId, leaderIndexSettings, newDirectory())) {
leaderStore.createEmpty();
leaderStore.createEmpty(leaderIndexMetaData.getCreationVersion().luceneVersion);
EngineConfig leaderConfig = engineConfig(shardId, leaderIndexSettings, threadPool, leaderStore, logger, xContentRegistry());
leaderStore.associateIndexWithNewTranslog(Translog.createEmptyTranslog(
leaderConfig.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L));