[STORE] Improve safety when deleting files from the store

Today if we delete files from the index directory we never acquire the
write lock. Yet, for safety reasons we should get the write lock before
we modify / delete any files. Where we can we should leave the deletion
to the index writer and only delete that are necessary to delete ourself.
This commit is contained in:
Simon Willnauer 2015-02-20 22:40:05 +01:00
parent 73faa2e5ed
commit c54bd2f7ad
8 changed files with 160 additions and 60 deletions

View File

@ -44,9 +44,7 @@ import org.apache.lucene.search.TimeLimitingCollector;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.Counter;
import org.apache.lucene.util.Version;
@ -148,10 +146,15 @@ public class Lucene {
* that are newer than the given segments file are removed forcefully to prevent problems with IndexWriter opening a potentially
* broken commit point / leftover.
* <b>Note:</b> this method will fail if there is another IndexWriter open on the given directory. This method will also acquire
* a write lock from the directory while pruning unused files.
* a write lock from the directory while pruning unused files. This method expects an existing index in the given directory that has
* the given segments file.
*/
public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Directory directory) throws IOException {
final SegmentInfos si = readSegmentInfos(segmentsFileName, directory);
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
}
int foundSegmentFiles = 0;
for (final String file : directory.listAll()) {
/**
@ -161,7 +164,7 @@ public class Lucene {
* since checksums don's match anymore. that's why we prune the name here directly.
* We also want the caller to know if we were not able to remove a segments_N file.
*/
if (file.startsWith(IndexFileNames.SEGMENTS)) {
if (file.startsWith(IndexFileNames.SEGMENTS) || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
foundSegmentFiles++;
if (file.equals(si.getSegmentsFileName()) == false) {
directory.deleteFile(file); // remove all segment_N files except of the one we wanna keep
@ -172,6 +175,7 @@ public class Lucene {
if (foundSegmentFiles == 0) {
throw new IllegalStateException("no commit found in the directory");
}
}
final CommitPoint cp = new CommitPoint(si, directory);
try (IndexWriter _ = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setIndexCommit(cp)
@ -183,6 +187,31 @@ public class Lucene {
return si;
}
/**
* This method removes all lucene files from the given directory. It will first try to delete all commit points / segments
* files to ensure broken commits or corrupted indices will not be opened in the future. If any of the segment files can't be deleted
* this operation fails.
*/
public static void cleanLuceneIndex(Directory directory) throws IOException {
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
}
for (final String file : directory.listAll()) {
if (file.startsWith(IndexFileNames.SEGMENTS) || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
directory.deleteFile(file); // remove all segment_N files
}
}
}
try (IndexWriter _ = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setMergePolicy(NoMergePolicy.INSTANCE) // no merges
.setCommitOnClose(false) // no commits
.setOpenMode(IndexWriterConfig.OpenMode.CREATE))) // force creation - don't append...
{
// do nothing and close this will kick of IndexFileDeleter which will remove all pending files
}
}
public static void checkSegmentInfoIntegrity(final Directory directory) throws IOException {
new SegmentInfos.FindSegmentsFile(directory) {

View File

@ -249,7 +249,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
metadataLock.writeLock().lock();
// we make sure that nobody fetches the metadata while we do this rename operation here to ensure we don't
// get exceptions if files are still open.
try {
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
}
for (Map.Entry<String, String> entry : entries) {
String tempFile = entry.getKey();
String origFile = entry.getValue();
@ -271,25 +274,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
/**
* Deletes the content of a shard store. Be careful calling this!.
*/
public void deleteContent() throws IOException {
ensureOpen();
final String[] files = directory.listAll();
final List<IOException> exceptions = new ArrayList<>();
for (String file : files) {
try {
directory.deleteFile(file);
} catch (NoSuchFileException | FileNotFoundException e) {
// ignore
} catch (IOException e) {
exceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
public StoreStats stats() throws IOException {
ensureOpen();
return statsCache.getOrRefresh();
@ -558,19 +542,29 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException {
failIfCorrupted();
metadataLock.writeLock().lock();
try {
try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) {
if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock
throw new LockObtainFailedException("Index locked for write: " + writeLock);
}
final StoreDirectory dir = directory;
for (String existingFile : dir.listAll()) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
if (!sourceMetaData.contains(existingFile) && !Store.isChecksum(existingFile)) {
if (existingFile.equals(IndexWriter.WRITE_LOCK_NAME) || Store.isChecksum(existingFile) || sourceMetaData.contains(existingFile)) {
continue; // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum)
}
try {
dir.deleteFile(reason, existingFile);
} catch (Exception ex) {
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS)
|| existingFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit point around?
throw new ElasticsearchIllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug("failed to delete file [{}]", ex, existingFile);
// ignore, we don't really care, will get deleted later on
}
}
}
final Store.MetadataSnapshot metadataOrEmpty = getMetadata();
verifyAfterCleanup(sourceMetaData, metadataOrEmpty);
} finally {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
@ -407,7 +408,12 @@ public class RecoveryTarget extends AbstractComponent {
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
store.deleteContent(); // clean up and delete all files
try {
Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
} catch (Throwable e) {
logger.debug("Failed to clean lucene index", e);
ex.addSuppressed(e);
}
throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);
} catch (Exception ex) {
throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);

View File

@ -48,6 +48,57 @@ public class LuceneTest extends ElasticsearchLuceneTestCase {
assertEquals(Lucene.VERSION, Version.LATEST);
}
public void testCleanIndex() throws IOException {
MockDirectoryWrapper dir = newMockDirectory();
dir.setEnableVirusScanner(false);
IndexWriterConfig iwc = newIndexWriterConfig();
iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setMaxBufferedDocs(2);
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new TextField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
doc = new Document();
doc.add(new TextField("id", "2", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
doc = new Document();
doc.add(new TextField("id", "3", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
doc = new Document();
doc.add(new TextField("id", "4", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.deleteDocuments(new Term("id", "2"));
writer.commit();
try (DirectoryReader open = DirectoryReader.open(writer, true)) {
assertEquals(3, open.numDocs());
assertEquals(1, open.numDeletedDocs());
assertEquals(4, open.maxDoc());
}
writer.close();
if (random().nextBoolean()) {
for (String file : dir.listAll()) {
if (file.startsWith("_1")) {
// delete a random file
dir.deleteFile(file);
break;
}
}
}
Lucene.cleanLuceneIndex(dir);
if (dir.listAll().length > 0) {
assertEquals(dir.listAll().length, 1);
assertEquals(dir.listAll()[0], "write.lock");
}
dir.close();
}
public void testPruneUnreferencedFiles() throws IOException {
MockDirectoryWrapper dir = newMockDirectory();
dir.setEnableVirusScanner(false);

View File

@ -41,6 +41,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
@ -136,9 +137,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
.build(); // TODO randomize more settings
threadPool = new ThreadPool(getClass().getName());
store = createStore();
store.deleteContent();
storeReplica = createStore();
storeReplica.deleteContent();
Lucene.cleanLuceneIndex(store.directory());
Lucene.cleanLuceneIndex(storeReplica.directory());
translog = createTranslog();
engine = createEngine(store, translog);
LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)engine).getCurrentIndexWriterConfig();

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
@ -127,9 +128,9 @@ public class ShadowEngineTests extends ElasticsearchLuceneTestCase {
threadPool = new ThreadPool(getClass().getName());
dirPath = newTempDirPath(LifecycleScope.TEST);
store = createStore(dirPath);
store.deleteContent();
storeReplica = createStore(dirPath);
storeReplica.deleteContent();
Lucene.cleanLuceneIndex(store.directory());
Lucene.cleanLuceneIndex(storeReplica.directory());
translog = createTranslog();
primaryEngine = createInternalEngine(store, translog);
LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)primaryEngine).getCurrentIndexWriterConfig();

View File

@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil;
import org.apache.lucene.util.Version;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -680,7 +681,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
}
public void assertDeleteContent(Store store, DirectoryService service) throws IOException {
store.deleteContent();
deleteContent(store.directory());
assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0));
assertThat(store.stats().sizeInBytes(), equalTo(0l));
for (Directory dir : service.build()) {
@ -898,7 +899,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
assertThat(newCommitDiff.missing.size(), equalTo(4)); // an entire segment must be missing (single doc segment got dropped) plus the commit is different
}
store.deleteContent();
deleteContent(store.directory());
IOUtils.close(store);
}
@ -1002,7 +1003,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
assertEquals("we wrote one checksum but it's gone now? - checksums are supposed to be kept", numChecksums, 1);
}
store.deleteContent();
deleteContent(store.directory());
IOUtils.close(store);
}
@ -1025,7 +1026,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
}
store.verifyAfterCleanup(snapshot, snapshot);
store.deleteContent();
deleteContent(store.directory());
IOUtils.close(store);
}
@ -1078,7 +1079,23 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
stats = store.stats();
assertEquals(stats.getSizeInBytes(), length);
store.deleteContent();
deleteContent(store.directory());
IOUtils.close(store);
}
public static void deleteContent(Directory directory) throws IOException {
final String[] files = directory.listAll();
final List<IOException> exceptions = new ArrayList<>();
for (String file : files) {
try {
directory.deleteFile(file);
} catch (NoSuchFileException | FileNotFoundException e) {
// ignore
} catch (IOException e) {
exceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
}

View File

@ -67,6 +67,7 @@ public class RecoveryStatusTests extends ElasticsearchSingleNodeTest {
}
}
assertNotNull(expectedFile);
indexShard.close("foo", false);// we have to close it here otherwise rename fails since the write.lock is held by the engine
status.renameAllTempFiles();
strings = Sets.newHashSet(status.store().directory().listAll());
assertTrue(strings.toString(), strings.contains("foo.bar"));