[RESTORE] Refactor how restore cleans up files after snapshot was restored

Today we restore files by running through the directory removeing all files
not in the snapshot. Some files in that direcotry might belong there even though
we remove them. This commit moves the responsiblity of cleaning up pending files
to lucene by utilizing IndexWriter#IndexFileDeleter
This commit is contained in:
Simon Willnauer 2015-02-18 15:58:33 +01:00
parent dae0ed168a
commit 8e09070246
5 changed files with 211 additions and 31 deletions

View File

@ -64,10 +64,7 @@ import org.elasticsearch.index.fielddata.IndexFieldData;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.*;
import static org.elasticsearch.common.lucene.search.NoopCollector.NOOP_COLLECTOR;
@ -136,6 +133,52 @@ public class Lucene {
return SegmentInfos.readCommit(directory, commit.getSegmentsFileName());
}
/**
* Reads the segments infos from the given segments file name, failing if it fails to load
*/
private static SegmentInfos readSegmentInfos(String segmentsFileName, Directory directory) throws IOException {
return SegmentInfos.readCommit(directory, segmentsFileName);
}
/**
* This method removes all files from the given directory that are not referenced by the given segments file.
* This method will open an IndexWriter and relies on index file deleter to remove all unreferenced files. Segment files
* that are newer than the given segments file are removed forcefully to prevent problems with IndexWriter opening a potentially
* broken commit point / leftover.
* <b>Note:</b> this method will fail if there is another IndexWriter open on the given directory. This method will also acquire
* a write lock from the directory while pruning unused files.
*/
public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Directory directory) throws IOException {
final SegmentInfos si = readSegmentInfos(segmentsFileName, directory);
while (true) {
/**
* we could also use a deletion policy here but in the case of snapshot and restore
* sometimes we restore an index and override files that were referenced by a "future"
* commit. If such a commit is opened by the IW it would likely throw a corrupted index exception
* since checksums don's match anymore. that's why we prune the name here directly.
* We also want the caller to know if we were not able to remove a segments_N file.
*
*/
String lastSegmentsFile = SegmentInfos.getLastCommitSegmentsFileName(directory);
if (lastSegmentsFile == null) {
throw new IllegalStateException("no commit found in the directory");
}
if (lastSegmentsFile.equals(si.getSegmentsFileName())) {
break;
}
directory.deleteFile(lastSegmentsFile);
}
final CommitPoint cp = new CommitPoint(si, directory);
try (IndexWriter _ = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setIndexCommit(cp)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND))) {
// do nothing and close this will kick of IndexFileDeleter which will remove all pending files
}
return si;
}
public static void checkSegmentInfoIntegrity(final Directory directory) throws IOException {
new SegmentInfos.FindSegmentsFile(directory) {
@ -645,4 +688,67 @@ public class Lucene {
}
};
}
private static final class CommitPoint extends IndexCommit {
private String segmentsFileName;
private final Collection<String> files;
private final Directory dir;
private final long generation;
private final Map<String,String> userData;
private final int segmentCount;
private CommitPoint(SegmentInfos infos, Directory dir) throws IOException {
segmentsFileName = infos.getSegmentsFileName();
this.dir = dir;
userData = infos.getUserData();
files = Collections.unmodifiableCollection(infos.files(dir, true));
generation = infos.getGeneration();
segmentCount = infos.size();
}
@Override
public String toString() {
return "DirectoryReader.ReaderCommit(" + segmentsFileName + ")";
}
@Override
public int getSegmentCount() {
return segmentCount;
}
@Override
public String getSegmentsFileName() {
return segmentsFileName;
}
@Override
public Collection<String> getFileNames() {
return files;
}
@Override
public Directory getDirectory() {
return dir;
}
@Override
public long getGeneration() {
return generation;
}
@Override
public boolean isDeleted() {
return false;
}
@Override
public Map<String,String> getUserData() {
return userData;
}
@Override
public void delete() {
throw new UnsupportedOperationException("This IndexCommit does not support deletions");
}
}
}

View File

@ -22,9 +22,7 @@ package org.elasticsearch.index.snapshots.blobstore;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.*;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
@ -789,32 +787,19 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
} catch (IOException ex) {
throw new IndexShardRestoreFailedException(shardId, "Failed to recover index", ex);
}
final StoreFileMetaData restoredSegmentsFile = sourceMetaData.getSegmentsFile();
if (recoveryTargetMetadata == null) {
throw new IndexShardRestoreFailedException(shardId, "Snapshot has no segments file");
}
assert restoredSegmentsFile != null;
// read the snapshot data persisted
long version = -1;
final SegmentInfos segmentCommitInfos;
try {
if (Lucene.indexExists(store.directory())) {
version = Lucene.readSegmentInfos(store.directory()).getVersion();
}
segmentCommitInfos = Lucene.pruneUnreferencedFiles(restoredSegmentsFile.name(), store.directory());
} catch (IOException e) {
throw new IndexShardRestoreFailedException(shardId, "Failed to fetch index version after copying it over", e);
}
recoveryState.getIndex().updateVersion(version);
/// now, go over and clean files that are in the store, but were not in the snapshot
try {
for (String storeFile : store.directory().listAll()) {
if (!Store.isChecksum(storeFile) && !snapshot.containPhysicalIndexFile(storeFile)) {
try {
store.deleteFile("restore", storeFile);
store.directory().deleteFile(storeFile);
} catch (IOException e) {
// ignore
}
}
}
} catch (IOException e) {
// ignore
}
recoveryState.getIndex().updateVersion(segmentCommitInfos.getVersion());
} finally {
store.decRef();
}

View File

@ -691,6 +691,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
MetadataSnapshot(IndexCommit commit, Directory directory, ESLogger logger) throws IOException {
metadata = buildMetadata(commit, directory, logger);
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
}
ImmutableMap<String, StoreFileMetaData> buildMetadata(IndexCommit commit, Directory directory, ESLogger logger) throws IOException {
@ -986,6 +987,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
builder.put(meta.name(), meta);
}
this.metadata = builder.build();
assert metadata.isEmpty() || numSegmentFiles() == 1 : "numSegmentFiles: " + numSegmentFiles();
}
@Override
@ -1002,6 +1004,29 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public boolean contains(String existingFile) {
return metadata.containsKey(existingFile);
}
/**
* Returns the segments file that this metadata snapshot represents or null if the snapshot is empty.
*/
public StoreFileMetaData getSegmentsFile() {
for (StoreFileMetaData file : this) {
if (file.name().startsWith(IndexFileNames.SEGMENTS)) {
return file;
}
}
assert metadata.isEmpty();
return null;
}
private final int numSegmentFiles() { // only for asserts
int count = 0;
for (StoreFileMetaData file : this) {
if (file.name().startsWith(IndexFileNames.SEGMENTS)) {
count++;
}
}
return count;
}
}
/**

View File

@ -17,14 +17,22 @@
* under the License.
*/
package org.elasticsearch.common.lucene;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.*;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.Version;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.junit.Test;
import java.io.IOException;
/**
*
*/
public class LuceneTest extends ElasticsearchTestCase {
public class LuceneTest extends ElasticsearchLuceneTestCase {
/*
@ -35,4 +43,61 @@ public class LuceneTest extends ElasticsearchTestCase {
// note this is just a silly sanity check, we test it in lucene, and we point to it this way
assertEquals(Lucene.VERSION, Version.LATEST);
}
public void testPruneUnreferencedFiles() throws IOException {
MockDirectoryWrapper dir = newMockDirectory();
dir.setEnableVirusScanner(false);
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setMaxBufferedDocs(2);
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new TextField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
doc = new Document();
doc.add(new TextField("id", "2", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
doc = new Document();
doc.add(new TextField("id", "3", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
SegmentInfos segmentCommitInfos = Lucene.readSegmentInfos(dir);
doc = new Document();
doc.add(new TextField("id", "4", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.deleteDocuments(new Term("id", "2"));
writer.commit();
DirectoryReader open = DirectoryReader.open(writer, true);
assertEquals(3, open.numDocs());
assertEquals(1, open.numDeletedDocs());
assertEquals(4, open.maxDoc());
open.close();
writer.close();
SegmentInfos si = Lucene.pruneUnreferencedFiles(segmentCommitInfos.getSegmentsFileName(), dir);
assertEquals(si.getSegmentsFileName(), segmentCommitInfos.getSegmentsFileName());
open = DirectoryReader.open(dir);
assertEquals(3, open.numDocs());
assertEquals(0, open.numDeletedDocs());
assertEquals(3, open.maxDoc());
IndexSearcher s = new IndexSearcher(open);
assertEquals(s.search(new TermQuery(new Term("id", "1")), 1).totalHits, 1);
assertEquals(s.search(new TermQuery(new Term("id", "2")), 1).totalHits, 1);
assertEquals(s.search(new TermQuery(new Term("id", "3")), 1).totalHits, 1);
assertEquals(s.search(new TermQuery(new Term("id", "4")), 1).totalHits, 0);
for (String file : dir.listAll()) {
assertFalse("unexpected file: " + file, file.equals("segments_3") || file.startsWith("_2"));
}
open.close();
dir.close();
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.snapshots;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;