From 8e0907024666ceaaa78faef9a89926c3ded01992 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 18 Feb 2015 15:58:33 +0100 Subject: [PATCH] [RESTORE] Refactor how restore cleans up files after snapshot was restored Today we restore files by running through the directory removeing all files not in the snapshot. Some files in that direcotry might belong there even though we remove them. This commit moves the responsiblity of cleaning up pending files to lucene by utilizing IndexWriter#IndexFileDeleter --- .../elasticsearch/common/lucene/Lucene.java | 114 +++++++++++++++++- .../BlobStoreIndexShardRepository.java | 33 ++--- .../org/elasticsearch/index/store/Store.java | 25 ++++ .../common/lucene/LuceneTest.java | 69 ++++++++++- .../SharedClusterSnapshotRestoreTests.java | 1 - 5 files changed, 211 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 9a0f9702a96..7db64677d2f 100644 --- a/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -64,10 +64,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData; import java.io.IOException; import java.text.ParseException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; +import java.util.*; import static org.elasticsearch.common.lucene.search.NoopCollector.NOOP_COLLECTOR; @@ -136,6 +133,52 @@ public class Lucene { return SegmentInfos.readCommit(directory, commit.getSegmentsFileName()); } + /** + * Reads the segments infos from the given segments file name, failing if it fails to load + */ + private static SegmentInfos readSegmentInfos(String segmentsFileName, Directory directory) throws IOException { + return SegmentInfos.readCommit(directory, segmentsFileName); + } + + /** + * This method removes all files from the given directory that are not referenced by the given segments file. + * This method will open an IndexWriter and relies on index file deleter to remove all unreferenced files. Segment files + * that are newer than the given segments file are removed forcefully to prevent problems with IndexWriter opening a potentially + * broken commit point / leftover. + * Note: this method will fail if there is another IndexWriter open on the given directory. This method will also acquire + * a write lock from the directory while pruning unused files. + */ + public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Directory directory) throws IOException { + final SegmentInfos si = readSegmentInfos(segmentsFileName, directory); + while (true) { + /** + * we could also use a deletion policy here but in the case of snapshot and restore + * sometimes we restore an index and override files that were referenced by a "future" + * commit. If such a commit is opened by the IW it would likely throw a corrupted index exception + * since checksums don's match anymore. that's why we prune the name here directly. + * We also want the caller to know if we were not able to remove a segments_N file. + * + */ + String lastSegmentsFile = SegmentInfos.getLastCommitSegmentsFileName(directory); + if (lastSegmentsFile == null) { + throw new IllegalStateException("no commit found in the directory"); + } + if (lastSegmentsFile.equals(si.getSegmentsFileName())) { + break; + } + directory.deleteFile(lastSegmentsFile); + } + final CommitPoint cp = new CommitPoint(si, directory); + try (IndexWriter _ = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) + .setIndexCommit(cp) + .setCommitOnClose(false) + .setMergePolicy(NoMergePolicy.INSTANCE) + .setOpenMode(IndexWriterConfig.OpenMode.APPEND))) { + // do nothing and close this will kick of IndexFileDeleter which will remove all pending files + } + return si; + } + public static void checkSegmentInfoIntegrity(final Directory directory) throws IOException { new SegmentInfos.FindSegmentsFile(directory) { @@ -645,4 +688,67 @@ public class Lucene { } }; } + + private static final class CommitPoint extends IndexCommit { + private String segmentsFileName; + private final Collection files; + private final Directory dir; + private final long generation; + private final Map userData; + private final int segmentCount; + + private CommitPoint(SegmentInfos infos, Directory dir) throws IOException { + segmentsFileName = infos.getSegmentsFileName(); + this.dir = dir; + userData = infos.getUserData(); + files = Collections.unmodifiableCollection(infos.files(dir, true)); + generation = infos.getGeneration(); + segmentCount = infos.size(); + } + + @Override + public String toString() { + return "DirectoryReader.ReaderCommit(" + segmentsFileName + ")"; + } + + @Override + public int getSegmentCount() { + return segmentCount; + } + + @Override + public String getSegmentsFileName() { + return segmentsFileName; + } + + @Override + public Collection getFileNames() { + return files; + } + + @Override + public Directory getDirectory() { + return dir; + } + + @Override + public long getGeneration() { + return generation; + } + + @Override + public boolean isDeleted() { + return false; + } + + @Override + public Map getUserData() { + return userData; + } + + @Override + public void delete() { + throw new UnsupportedOperationException("This IndexCommit does not support deletions"); + } + } } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 1c90b795865..3539b386bfb 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -22,9 +22,7 @@ package org.elasticsearch.index.snapshots.blobstore; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.index.IndexFormatTooNewException; -import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.*; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -789,32 +787,19 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } catch (IOException ex) { throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex); } + final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile(); + if (recoveryTargetMetadata == null) { + throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file"); + } + assert restoredSegmentsFile != null; // read the snapshot data persisted - long version = -1; + final SegmentInfos segmentCommitInfos; try { - if (Lucene.indexExists(store.directory())) { - version = Lucene.readSegmentInfos(store.directory()).getVersion(); - } + segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory()); } catch (IOException e) { throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e); } - recoveryState.getIndex().updateVersion(version); - - /// now, go over and clean files that are in the store, but were not in the snapshot - try { - for (String storeFile : store.directory().listAll()) { - if (!Store.isChecksum(storeFile) && !snapshot.containPhysicalIndexFile(storeFile)) { - try { - store.deleteFile("restore", storeFile); - store.directory().deleteFile(storeFile); - } catch (IOException e) { - // ignore - } - } - } - } catch (IOException e) { - // ignore - } + recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion()); } finally { store.decRef(); } diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index d0c5658530d..9c79e3cc07d 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -691,6 +691,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref MetadataSnapshot(IndexCommit commit, Directory directory, ESLogger logger) throws IOException { metadata = buildMetadata(commit, directory, logger); + assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles(); } ImmutableMap buildMetadata(IndexCommit commit, Directory directory, ESLogger logger) throws IOException { @@ -986,6 +987,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref builder.put(meta.name(), meta); } this.metadata = builder.build(); + assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles(); } @Override @@ -1002,6 +1004,29 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public boolean contains(String existingFile) { return metadata.containsKey(existingFile); } + + /** + * Returns the segments file that this metadata snapshot represents or null if the snapshot is empty. + */ + public StoreFileMetaData getSegmentsFile() { + for (StoreFileMetaData file : this) { + if (file.name().startsWith(IndexFileNames.SEGMENTS)) { + return file; + } + } + assert metadata.isEmpty(); + return null; + } + + private final int numSegmentFiles() { // only for asserts + int count = 0; + for (StoreFileMetaData file : this) { + if (file.name().startsWith(IndexFileNames.SEGMENTS)) { + count++; + } + } + return count; + } } /** diff --git a/src/test/java/org/elasticsearch/common/lucene/LuceneTest.java b/src/test/java/org/elasticsearch/common/lucene/LuceneTest.java index bab7e74eec9..2de38507be5 100644 --- a/src/test/java/org/elasticsearch/common/lucene/LuceneTest.java +++ b/src/test/java/org/elasticsearch/common/lucene/LuceneTest.java @@ -17,14 +17,22 @@ * under the License. */ package org.elasticsearch.common.lucene; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; +import org.apache.lucene.index.*; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Version; -import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.ElasticsearchLuceneTestCase; import org.junit.Test; +import java.io.IOException; /** * */ -public class LuceneTest extends ElasticsearchTestCase { +public class LuceneTest extends ElasticsearchLuceneTestCase { /* @@ -35,4 +43,61 @@ public class LuceneTest extends ElasticsearchTestCase { // note this is just a silly sanity check, we test it in lucene, and we point to it this way assertEquals(Lucene.VERSION, Version.LATEST); } + + public void testPruneUnreferencedFiles() throws IOException { + MockDirectoryWrapper dir = newMockDirectory(); + dir.setEnableVirusScanner(false); + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + iwc.setMaxBufferedDocs(2); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new TextField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + writer.commit(); + + doc = new Document(); + doc.add(new TextField("id", "2", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + + doc = new Document(); + doc.add(new TextField("id", "3", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + + writer.commit(); + SegmentInfos segmentCommitInfos = Lucene.readSegmentInfos(dir); + + doc = new Document(); + doc.add(new TextField("id", "4", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + + writer.deleteDocuments(new Term("id", "2")); + writer.commit(); + DirectoryReader open = DirectoryReader.open(writer, true); + assertEquals(3, open.numDocs()); + assertEquals(1, open.numDeletedDocs()); + assertEquals(4, open.maxDoc()); + open.close(); + writer.close(); + SegmentInfos si = Lucene.pruneUnreferencedFiles(segmentCommitInfos.getSegmentsFileName(), dir); + assertEquals(si.getSegmentsFileName(), segmentCommitInfos.getSegmentsFileName()); + open = DirectoryReader.open(dir); + assertEquals(3, open.numDocs()); + assertEquals(0, open.numDeletedDocs()); + assertEquals(3, open.maxDoc()); + + IndexSearcher s = new IndexSearcher(open); + assertEquals(s.search(new TermQuery(new Term("id", "1")), 1).totalHits, 1); + assertEquals(s.search(new TermQuery(new Term("id", "2")), 1).totalHits, 1); + assertEquals(s.search(new TermQuery(new Term("id", "3")), 1).totalHits, 1); + assertEquals(s.search(new TermQuery(new Term("id", "4")), 1).totalHits, 0); + + for (String file : dir.listAll()) { + assertFalse("unexpected file: " + file, file.equals("segments_3") || file.startsWith("_2")); + } + open.close(); + dir.close(); + + } } diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index 8850f6c8c0b..c67da7f628e 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.snapshots; -import com.carrotsearch.randomizedtesting.LifecycleScope; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList;