Mark translog as upgraded in the engine even if a legacy generation exists
Today we mark a translog as upgraded by adding a marker to the engine commit. Yet, this commit was only added if there was no translog present before ie. only if we have a fresh engine which is missing the entire point. Yet, this commit adds a backwards index tests that ensures we can open old indices more than once ie. mark the index as upgraded. Closes #11858
This commit is contained in:
parent
74aed85c36
commit
8e8526c746
|
@ -178,8 +178,12 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
final Translog translog = new Translog(translogConfig);
|
final Translog translog = new Translog(translogConfig);
|
||||||
if (generation == null) {
|
if (generation == null || generation.translogUUID == null) {
|
||||||
logger.debug("no translog ID present in the current generation - creating one");
|
if (generation == null) {
|
||||||
|
logger.debug("no translog ID present in the current generation - creating one");
|
||||||
|
} else if (generation.translogUUID == null) {
|
||||||
|
logger.debug("upgraded translog to pre 2.0 format, associating translog with index - writing translog UUID");
|
||||||
|
}
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
try {
|
try {
|
||||||
commitIndexWriter(writer, translog);
|
commitIndexWriter(writer, translog);
|
||||||
|
|
|
@ -210,9 +210,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
|
||||||
if (translogGeneration.translogUUID != null) {
|
if (translogGeneration.translogUUID != null) {
|
||||||
throw new IllegalArgumentException("TranslogGeneration has a non-null UUID - index must have already been upgraded");
|
throw new IllegalArgumentException("TranslogGeneration has a non-null UUID - index must have already been upgraded");
|
||||||
}
|
}
|
||||||
assert translogGeneration.translogUUID == null : "Already upgrade";
|
|
||||||
try {
|
try {
|
||||||
assert Checkpoint.read(translogPath.resolve(CHECKPOINT_FILE_NAME)) == null;
|
if (Checkpoint.read(translogPath.resolve(CHECKPOINT_FILE_NAME)) != null) {
|
||||||
|
throw new IllegalStateException(CHECKPOINT_FILE_NAME + " file already present, translog is already upgraded");
|
||||||
|
}
|
||||||
} catch (NoSuchFileException | FileNotFoundException ex) {
|
} catch (NoSuchFileException | FileNotFoundException ex) {
|
||||||
logger.debug("upgrading translog - no checkpoint found");
|
logger.debug("upgrading translog - no checkpoint found");
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,13 +38,16 @@ import org.apache.lucene.store.AlreadyClosedException;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.MockDirectoryWrapper;
|
import org.apache.lucene.store.MockDirectoryWrapper;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
|
import org.apache.lucene.util.TestUtil;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.bwcompat.OldIndexBackwardsCompatibilityTests;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.Base64;
|
import org.elasticsearch.common.Base64;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.lucene.uid.Versions;
|
import org.elasticsearch.common.lucene.uid.Versions;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
@ -83,10 +86,12 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
|
import java.nio.file.DirectoryStream;
|
||||||
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Arrays;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -1711,6 +1716,102 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
return new Mapping(Version.CURRENT, root, new RootMapper[0], new Mapping.SourceTransform[0], ImmutableMap.<String, Object>of());
|
return new Mapping(Version.CURRENT, root, new RootMapper[0], new Mapping.SourceTransform[0], ImmutableMap.<String, Object>of());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testUpgradeOldIndex() throws IOException {
|
||||||
|
List<Path> indexes = new ArrayList<>();
|
||||||
|
Path dir = getDataPath("/" + OldIndexBackwardsCompatibilityTests.class.getPackage().getName().replace('.', '/')); // the files are in the same pkg as the OldIndexBackwardsCompatibilityTests test
|
||||||
|
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "index-*.zip")) {
|
||||||
|
for (Path path : stream) {
|
||||||
|
indexes.add(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Collections.shuffle(indexes, random());
|
||||||
|
for (Path indexFile : indexes.subList(0, scaledRandomIntBetween(1, indexes.size() / 2))) {
|
||||||
|
final String indexName = indexFile.getFileName().toString().replace(".zip", "").toLowerCase(Locale.ROOT);
|
||||||
|
Version version = Version.fromString(indexName.replace("index-", ""));
|
||||||
|
if (version.onOrAfter(Version.V_2_0_0)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Path unzipDir = createTempDir();
|
||||||
|
Path unzipDataDir = unzipDir.resolve("data");
|
||||||
|
// decompress the index
|
||||||
|
try (InputStream stream = Files.newInputStream(indexFile)) {
|
||||||
|
TestUtil.unzip(stream, unzipDir);
|
||||||
|
}
|
||||||
|
// check it is unique
|
||||||
|
assertTrue(Files.exists(unzipDataDir));
|
||||||
|
Path[] list = filterExtraFSFiles(FileSystemUtils.files(unzipDataDir));
|
||||||
|
|
||||||
|
if (list.length != 1) {
|
||||||
|
throw new IllegalStateException("Backwards index must contain exactly one cluster but was " + list.length + " " + Arrays.toString(list));
|
||||||
|
}
|
||||||
|
// the bwc scripts packs the indices under this path
|
||||||
|
Path src = list[0].resolve("nodes/0/indices/" + indexName);
|
||||||
|
Path translog = list[0].resolve("nodes/0/indices/" + indexName).resolve("0").resolve("translog");
|
||||||
|
assertTrue("[" + indexFile + "] missing index dir: " + src.toString(), Files.exists(src));
|
||||||
|
assertTrue("[" + indexFile + "] missing translog dir: " + translog.toString(), Files.exists(translog));
|
||||||
|
Path[] tlogFiles = filterExtraFSFiles(FileSystemUtils.files(translog));
|
||||||
|
assertEquals(Arrays.toString(tlogFiles), tlogFiles.length, 1);
|
||||||
|
final long size = Files.size(tlogFiles[0]);
|
||||||
|
|
||||||
|
final long generation = Translog.parseIdFromFileName(tlogFiles[0]);
|
||||||
|
assertTrue(generation >= 1);
|
||||||
|
logger.debug("upgrading index {} file: {} size: {}", indexName, tlogFiles[0].getFileName(), size);
|
||||||
|
Directory directory = newFSDirectory(src.resolve("0").resolve("index"));
|
||||||
|
Store store = createStore(directory);
|
||||||
|
final int iters = randomIntBetween(0, 2);
|
||||||
|
int numDocs = -1;
|
||||||
|
for (int i = 0; i < iters; i++) { // make sure we can restart on an upgraded index
|
||||||
|
try (InternalEngine engine = createEngine(store, translog)) {
|
||||||
|
try (Searcher searcher = engine.acquireSearcher("test")) {
|
||||||
|
if (i > 0) {
|
||||||
|
assertEquals(numDocs, searcher.reader().numDocs());
|
||||||
|
}
|
||||||
|
TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 1);
|
||||||
|
numDocs = searcher.reader().numDocs();
|
||||||
|
assertTrue(search.totalHits > 1);
|
||||||
|
}
|
||||||
|
CommitStats commitStats = engine.commitStats();
|
||||||
|
Map<String, String> userData = commitStats.getUserData();
|
||||||
|
assertTrue("userdata dosn't contain uuid",userData.containsKey(Translog.TRANSLOG_UUID_KEY));
|
||||||
|
assertTrue("userdata doesn't contain generation key", userData.containsKey(Translog.TRANSLOG_GENERATION_KEY));
|
||||||
|
assertFalse("userdata contains legacy marker", userData.containsKey("translog_id"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (InternalEngine engine = createEngine(store, translog)) {
|
||||||
|
if (numDocs == -1) {
|
||||||
|
try (Searcher searcher = engine.acquireSearcher("test")) {
|
||||||
|
numDocs = searcher.reader().numDocs();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final int numExtraDocs = randomIntBetween(1, 10);
|
||||||
|
for (int i = 0; i < numExtraDocs; i++) {
|
||||||
|
ParsedDocument doc = testParsedDocument("extra" + Integer.toString(i), "extra" + Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
|
||||||
|
Engine.Create firstIndexRequest = new Engine.Create(null, newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), false, false);
|
||||||
|
engine.create(firstIndexRequest);
|
||||||
|
assertThat(firstIndexRequest.version(), equalTo(1l));
|
||||||
|
}
|
||||||
|
engine.refresh("test");
|
||||||
|
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
|
||||||
|
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + numExtraDocs));
|
||||||
|
assertThat(topDocs.totalHits, equalTo(numDocs + numExtraDocs));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
IOUtils.close(store, directory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path[] filterExtraFSFiles(Path[] files) {
|
||||||
|
List<Path> paths = new ArrayList<>();
|
||||||
|
for (Path p : files) {
|
||||||
|
if (p.getFileName().toString().startsWith("extra")) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
paths.add(p);
|
||||||
|
}
|
||||||
|
return paths.toArray(new Path[0]);
|
||||||
|
}
|
||||||
|
|
||||||
public void testTranslogReplay() throws IOException {
|
public void testTranslogReplay() throws IOException {
|
||||||
boolean canHaveDuplicates = true;
|
boolean canHaveDuplicates = true;
|
||||||
boolean autoGeneratedId = true;
|
boolean autoGeneratedId = true;
|
||||||
|
@ -1833,6 +1934,13 @@ public class InternalEngineTests extends ElasticsearchTestCase {
|
||||||
protected void operationProcessed() {
|
protected void operationProcessed() {
|
||||||
recoveredOps.incrementAndGet();
|
recoveredOps.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
|
||||||
|
if (operation.opType() != Translog.Operation.Type.DELETE_BY_QUERY) { // we don't support del by query in this test
|
||||||
|
super.performRecoveryOperation(engine, operation, allowMappingUpdates);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRecoverFromForeignTranslog() throws IOException {
|
public void testRecoverFromForeignTranslog() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue