diff --git a/src/main/java/org/elasticsearch/common/lucene/Lucene.java b/src/main/java/org/elasticsearch/common/lucene/Lucene.java index 1a58b15b839..7c7eb9e3ff0 100644 --- a/src/main/java/org/elasticsearch/common/lucene/Lucene.java +++ b/src/main/java/org/elasticsearch/common/lucene/Lucene.java @@ -44,9 +44,7 @@ import org.apache.lucene.search.TimeLimitingCollector; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TotalHitCountCollector; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.*; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Counter; import org.apache.lucene.util.Version; @@ -148,29 +146,35 @@ public class Lucene { * that are newer than the given segments file are removed forcefully to prevent problems with IndexWriter opening a potentially * broken commit point / leftover. * Note: this method will fail if there is another IndexWriter open on the given directory. This method will also acquire - * a write lock from the directory while pruning unused files. + * a write lock from the directory while pruning unused files. This method expects an existing index in the given directory that has + * the given segments file. */ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Directory directory) throws IOException { final SegmentInfos si = readSegmentInfos(segmentsFileName, directory); - int foundSegmentFiles = 0; - for (final String file : directory.listAll()) { - /** - * we could also use a deletion policy here but in the case of snapshot and restore - * sometimes we restore an index and override files that were referenced by a "future" - * commit. If such a commit is opened by the IW it would likely throw a corrupted index exception - * since checksums don's match anymore. that's why we prune the name here directly. - * We also want the caller to know if we were not able to remove a segments_N file. - */ - if (file.startsWith(IndexFileNames.SEGMENTS)) { - foundSegmentFiles++; - if (file.equals(si.getSegmentsFileName()) == false) { - directory.deleteFile(file); // remove all segment_N files except of the one we wanna keep + try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { + if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock + throw new LockObtainFailedException("Index locked for write: " + writeLock); + } + int foundSegmentFiles = 0; + for (final String file : directory.listAll()) { + /** + * we could also use a deletion policy here but in the case of snapshot and restore + * sometimes we restore an index and override files that were referenced by a "future" + * commit. If such a commit is opened by the IW it would likely throw a corrupted index exception + * since checksums don's match anymore. that's why we prune the name here directly. + * We also want the caller to know if we were not able to remove a segments_N file. + */ + if (file.startsWith(IndexFileNames.SEGMENTS) || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { + foundSegmentFiles++; + if (file.equals(si.getSegmentsFileName()) == false) { + directory.deleteFile(file); // remove all segment_N files except of the one we wanna keep + } } } - } - assert SegmentInfos.getLastCommitSegmentsFileName(directory).equals(segmentsFileName); - if (foundSegmentFiles == 0) { - throw new IllegalStateException("no commit found in the directory"); + assert SegmentInfos.getLastCommitSegmentsFileName(directory).equals(segmentsFileName); + if (foundSegmentFiles == 0) { + throw new IllegalStateException("no commit found in the directory"); + } } final CommitPoint cp = new CommitPoint(si, directory); try (IndexWriter _ = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) @@ -183,6 +187,31 @@ public class Lucene { return si; } + /** + * This method removes all lucene files from the given directory. It will first try to delete all commit points / segments + * files to ensure broken commits or corrupted indices will not be opened in the future. If any of the segment files can't be deleted + * this operation fails. + */ + public static void cleanLuceneIndex(Directory directory) throws IOException { + try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { + if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock + throw new LockObtainFailedException("Index locked for write: " + writeLock); + } + for (final String file : directory.listAll()) { + if (file.startsWith(IndexFileNames.SEGMENTS) || file.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { + directory.deleteFile(file); // remove all segment_N files + } + } + } + try (IndexWriter _ = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER) + .setMergePolicy(NoMergePolicy.INSTANCE) // no merges + .setCommitOnClose(false) // no commits + .setOpenMode(IndexWriterConfig.OpenMode.CREATE))) // force creation - don't append... + { + // do nothing and close this will kick of IndexFileDeleter which will remove all pending files + } + } + public static void checkSegmentInfoIntegrity(final Directory directory) throws IOException { new SegmentInfos.FindSegmentsFile(directory) { diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index e5ea9a833d7..6fe624bf0dd 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -249,7 +249,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref metadataLock.writeLock().lock(); // we make sure that nobody fetches the metadata while we do this rename operation here to ensure we don't // get exceptions if files are still open. - try { + try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { + if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock + throw new LockObtainFailedException("Index locked for write: " + writeLock); + } for (Map.Entry entry : entries) { String tempFile = entry.getKey(); String origFile = entry.getValue(); @@ -271,25 +274,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref } - /** - * Deletes the content of a shard store. Be careful calling this!. - */ - public void deleteContent() throws IOException { - ensureOpen(); - final String[] files = directory.listAll(); - final List exceptions = new ArrayList<>(); - for (String file : files) { - try { - directory.deleteFile(file); - } catch (NoSuchFileException | FileNotFoundException e) { - // ignore - } catch (IOException e) { - exceptions.add(e); - } - } - ExceptionsHelper.rethrowAndSuppress(exceptions); - } - public StoreStats stats() throws IOException { ensureOpen(); return statsCache.getOrRefresh(); @@ -558,17 +542,27 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetaData) throws IOException { failIfCorrupted(); metadataLock.writeLock().lock(); - try { + try (Lock writeLock = directory.makeLock(IndexWriter.WRITE_LOCK_NAME)) { + if (!writeLock.obtain(IndexWriterConfig.getDefaultWriteLockTimeout())) { // obtain write lock + throw new LockObtainFailedException("Index locked for write: " + writeLock); + } final StoreDirectory dir = directory; for (String existingFile : dir.listAll()) { - // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum) - if (!sourceMetaData.contains(existingFile) && !Store.isChecksum(existingFile)) { - try { - dir.deleteFile(reason, existingFile); - } catch (Exception ex) { - logger.debug("failed to delete file [{}]", ex, existingFile); - // ignore, we don't really care, will get deleted later on + if (existingFile.equals(IndexWriter.WRITE_LOCK_NAME) || Store.isChecksum(existingFile) || sourceMetaData.contains(existingFile)) { + continue; // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete checksum) + } + try { + dir.deleteFile(reason, existingFile); + // FNF should not happen since we hold a write lock? + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) + || existingFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit point around? + throw new ElasticsearchIllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); } + logger.debug("failed to delete file [{}]", ex, existingFile); + // ignore, we don't really care, will get deleted later on } } final Store.MetadataSnapshot metadataOrEmpty = getMetadata(); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 6f8efa5ebfc..553d1633f4a 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -407,7 +408,12 @@ public class RecoveryTarget extends AbstractComponent { // broken. We have to clean up this shard entirely, remove all files and bubble it up to the // source shard since this index might be broken there as well? The Source can handle this and checks // its content on disk if possible. - store.deleteContent(); // clean up and delete all files + try { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } catch (Throwable e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); + } throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); } catch (Exception ex) { throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); diff --git a/src/test/java/org/elasticsearch/common/lucene/LuceneTest.java b/src/test/java/org/elasticsearch/common/lucene/LuceneTest.java index b7f1c5e23f1..87a68784d62 100644 --- a/src/test/java/org/elasticsearch/common/lucene/LuceneTest.java +++ b/src/test/java/org/elasticsearch/common/lucene/LuceneTest.java @@ -48,6 +48,57 @@ public class LuceneTest extends ElasticsearchLuceneTestCase { assertEquals(Lucene.VERSION, Version.LATEST); } + public void testCleanIndex() throws IOException { + MockDirectoryWrapper dir = newMockDirectory(); + dir.setEnableVirusScanner(false); + IndexWriterConfig iwc = newIndexWriterConfig(); + iwc.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); + iwc.setMergePolicy(NoMergePolicy.INSTANCE); + iwc.setMaxBufferedDocs(2); + IndexWriter writer = new IndexWriter(dir, iwc); + Document doc = new Document(); + doc.add(new TextField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + writer.commit(); + + doc = new Document(); + doc.add(new TextField("id", "2", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + + doc = new Document(); + doc.add(new TextField("id", "3", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + + writer.commit(); + doc = new Document(); + doc.add(new TextField("id", "4", random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + writer.addDocument(doc); + + writer.deleteDocuments(new Term("id", "2")); + writer.commit(); + try (DirectoryReader open = DirectoryReader.open(writer, true)) { + assertEquals(3, open.numDocs()); + assertEquals(1, open.numDeletedDocs()); + assertEquals(4, open.maxDoc()); + } + writer.close(); + if (random().nextBoolean()) { + for (String file : dir.listAll()) { + if (file.startsWith("_1")) { + // delete a random file + dir.deleteFile(file); + break; + } + } + } + Lucene.cleanLuceneIndex(dir); + if (dir.listAll().length > 0) { + assertEquals(dir.listAll().length, 1); + assertEquals(dir.listAll()[0], "write.lock"); + } + dir.close(); + } + public void testPruneUnreferencedFiles() throws IOException { MockDirectoryWrapper dir = newMockDirectory(); dir.setEnableVirusScanner(false); diff --git a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 3c581fceb05..6d6d6ab43dd 100644 --- a/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; @@ -136,9 +137,9 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { .build(); // TODO randomize more settings threadPool = new ThreadPool(getClass().getName()); store = createStore(); - store.deleteContent(); storeReplica = createStore(); - storeReplica.deleteContent(); + Lucene.cleanLuceneIndex(store.directory()); + Lucene.cleanLuceneIndex(storeReplica.directory()); translog = createTranslog(); engine = createEngine(store, translog); LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)engine).getCurrentIndexWriterConfig(); diff --git a/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java index 45a128551c2..68675b14af5 100644 --- a/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; @@ -127,9 +128,9 @@ public class ShadowEngineTests extends ElasticsearchLuceneTestCase { threadPool = new ThreadPool(getClass().getName()); dirPath = newTempDirPath(LifecycleScope.TEST); store = createStore(dirPath); - store.deleteContent(); storeReplica = createStore(dirPath); - storeReplica.deleteContent(); + Lucene.cleanLuceneIndex(store.directory()); + Lucene.cleanLuceneIndex(storeReplica.directory()); translog = createTranslog(); primaryEngine = createInternalEngine(store, translog); LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)primaryEngine).getCurrentIndexWriterConfig(); diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index 95f742243c4..ce27b4f3a2e 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -31,6 +31,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.Version; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -680,7 +681,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { } public void assertDeleteContent(Store store, DirectoryService service) throws IOException { - store.deleteContent(); + deleteContent(store.directory()); assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0)); assertThat(store.stats().sizeInBytes(), equalTo(0l)); for (Directory dir : service.build()) { @@ -898,7 +899,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { assertThat(newCommitDiff.missing.size(), equalTo(4)); // an entire segment must be missing (single doc segment got dropped) plus the commit is different } - store.deleteContent(); + deleteContent(store.directory()); IOUtils.close(store); } @@ -1002,14 +1003,14 @@ public class StoreTest extends ElasticsearchLuceneTestCase { assertEquals("we wrote one checksum but it's gone now? - checksums are supposed to be kept", numChecksums, 1); } - store.deleteContent(); + deleteContent(store.directory()); IOUtils.close(store); } @Test public void testCleanUpWithLegacyChecksums() throws IOException { Map metaDataMap = new HashMap<>(); - metaDataMap.put("segments_1", new StoreFileMetaData("segments_1", 50, null, null, new BytesRef(new byte[] {1}))); + metaDataMap.put("segments_1", new StoreFileMetaData("segments_1", 50, null, null, new BytesRef(new byte[]{1}))); metaDataMap.put("_0_1.del", new StoreFileMetaData("_0_1.del", 42, "foobarbaz", null, new BytesRef())); Store.MetadataSnapshot snapshot = new Store.MetadataSnapshot(metaDataMap); @@ -1025,7 +1026,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { } store.verifyAfterCleanup(snapshot, snapshot); - store.deleteContent(); + deleteContent(store.directory()); IOUtils.close(store); } @@ -1078,7 +1079,23 @@ public class StoreTest extends ElasticsearchLuceneTestCase { stats = store.stats(); assertEquals(stats.getSizeInBytes(), length); - store.deleteContent(); + deleteContent(store.directory()); IOUtils.close(store); } + + + public static void deleteContent(Directory directory) throws IOException { + final String[] files = directory.listAll(); + final List exceptions = new ArrayList<>(); + for (String file : files) { + try { + directory.deleteFile(file); + } catch (NoSuchFileException | FileNotFoundException e) { + // ignore + } catch (IOException e) { + exceptions.add(e); + } + } + ExceptionsHelper.rethrowAndSuppress(exceptions); + } } diff --git a/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java b/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java index 84206f4ef77..55ffeafb165 100644 --- a/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java +++ b/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java @@ -67,6 +67,7 @@ public class RecoveryStatusTests extends ElasticsearchSingleNodeTest { } } assertNotNull(expectedFile); + indexShard.close("foo", false);// we have to close it here otherwise rename fails since the write.lock is held by the engine status.renameAllTempFiles(); strings = Sets.newHashSet(status.store().directory().listAll()); assertTrue(strings.toString(), strings.contains("foo.bar"));