diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java index 15b11b4e65..801bc6bc1a 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneCacheWarmer.java @@ -17,15 +17,16 @@ package org.apache.nifi.provenance.index.lucene; -import java.io.File; -import java.util.concurrent.TimeUnit; - import org.apache.nifi.provenance.index.EventIndexSearcher; import org.apache.nifi.provenance.lucene.IndexManager; import org.apache.nifi.provenance.util.DirectoryUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.FileNotFoundException; +import java.util.concurrent.TimeUnit; + public class LuceneCacheWarmer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(LuceneCacheWarmer.class); @@ -51,7 +52,14 @@ public class LuceneCacheWarmer implements Runnable { for (final File indexDir : indexDirs) { final long indexStartNanos = System.nanoTime(); - final EventIndexSearcher eventSearcher = indexManager.borrowIndexSearcher(indexDir); + final EventIndexSearcher eventSearcher; + try { + eventSearcher = indexManager.borrowIndexSearcher(indexDir); + } catch (final FileNotFoundException fnfe) { + logger.debug("Cannot warm Lucene Index directory {} because the directory no longer exists", indexDir); + continue; + } + indexManager.returnIndexSearcher(eventSearcher); final long indexWarmMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStartNanos); diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/StandardIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/StandardIndexManager.java index ca9ceefc90..65c64a9257 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/StandardIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/StandardIndexManager.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -49,7 +50,10 @@ import java.util.concurrent.TimeUnit; public class StandardIndexManager implements IndexManager { private static final Logger logger = LoggerFactory.getLogger(StandardIndexManager.class); - private final Map writerCounts = new HashMap<>(); // guarded by synchronizing on map itself + private final Object countMutex = new Object(); + private final Map writerCounts = new HashMap<>(); // guarded by synchronizing on countMutex + private final Map searcherCounts = new HashMap<>(); // guarded by synchronizing on countMutex + private final ExecutorService searchExecutor; private final RepositoryConfiguration repoConfig; @@ -72,7 +76,7 @@ public class StandardIndexManager implements IndexManager { searchExecutor.shutdownNow(); } - synchronized (writerCounts) { + synchronized (countMutex) { final Set closed = new HashSet<>(); for (final Map.Entry entry : writerCounts.entrySet()) { @@ -92,18 +96,33 @@ public class StandardIndexManager implements IndexManager { final File absoluteFile = indexDir.getAbsoluteFile(); final IndexWriterCount writerCount; - synchronized (writerCounts) { + synchronized (countMutex) { writerCount = writerCounts.remove(absoluteFile); - if (writerCount != null) { - // Increment writer count and create an Index Searcher based on the writer + // If there is an Index Writer already, increment writer count and create an Index Searcher based on the writer. This gives our searcher + // access to events that have been written by that writer and not necessarily yet committed to the index. Otherwise, we can just create + // an index searcher but must increment the number of Index Searchers we have active in order to avoid allowing the directory to be + // deleted while the Index Searcher is active. + if (writerCount == null) { + final Integer searcherCount = searcherCounts.remove(absoluteFile); + final int updatedSearcherCount = (searcherCount == null) ? 1 : searcherCount + 1; + searcherCounts.put(absoluteFile, updatedSearcherCount); + logger.debug("Index Searcher being borrowed for {}. No Active Writer so incrementing Searcher Count to {}", absoluteFile, updatedSearcherCount); + } else { + final int updatedWriterCount = writerCount.getCount() + 1; writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), writerCount.getAnalyzer(), - writerCount.getDirectory(), writerCount.getCount() + 1, writerCount.isCloseableWhenUnused())); + writerCount.getDirectory(), updatedWriterCount, writerCount.isCloseableWhenUnused())); + logger.debug("Index Searcher being borrowed for {}. An Active Writer exists so incrementing Writer Count to {}", absoluteFile, updatedWriterCount); } } final DirectoryReader directoryReader; if (writerCount == null) { + final boolean directoryExists = indexDir.exists(); + if (!directoryExists) { + throw new FileNotFoundException("Cannot search Provenance Index Directory " + indexDir.getAbsolutePath() + " because the directory does not exist"); + } + logger.trace("Creating index searcher for {}", indexDir); final Directory directory = FSDirectory.open(indexDir.toPath()); directoryReader = DirectoryReader.open(directory); @@ -127,11 +146,20 @@ public class StandardIndexManager implements IndexManager { final IndexWriterCount count; boolean closeWriter = false; - synchronized (writerCounts) { + synchronized (countMutex) { final File absoluteFile = searcher.getIndexDirectory().getAbsoluteFile(); count = writerCounts.get(absoluteFile); if (count == null) { - logger.debug("Returning EventIndexSearcher for {}; there is no active writer for this searcher so will not decrement writerCounts", absoluteFile); + final Integer searcherCount = searcherCounts.remove(absoluteFile); + final int updatedSearcherCount = (searcherCount == null) ? 0 : searcherCount - 1; + if (updatedSearcherCount <= 0) { + searcherCounts.remove(absoluteFile); + } else { + searcherCounts.put(absoluteFile, updatedSearcherCount); + } + + logger.debug("Returning EventIndexSearcher for {}; there is no active writer for this searcher so will not decrement writerCounts. Decrementing Searcher Count to {}", + absoluteFile, updatedSearcherCount); return; } @@ -168,7 +196,13 @@ public class StandardIndexManager implements IndexManager { logger.debug("Attempting to remove index {} from SimpleIndexManager", absoluteFile); IndexWriterCount writerCount; - synchronized (writerCounts) { + synchronized (countMutex) { + final Integer numSearchers = searcherCounts.get(absoluteFile); + if (numSearchers != null && numSearchers > 0) { + logger.debug("Not allowing removal of index {} because the active searcher count for this directory is {}", absoluteFile, numSearchers); + return false; + } + writerCount = writerCounts.remove(absoluteFile); if (writerCount == null) { logger.debug("Allowing removal of index {} because there is no IndexWriterCount for this directory", absoluteFile); @@ -183,18 +217,18 @@ public class StandardIndexManager implements IndexManager { } try { + // A WriterCount exists and has a count of 0. logger.debug("Removing index {} from SimpleIndexManager and closing the writer", absoluteFile); close(writerCount); } catch (final Exception e) { logger.error("Failed to close Index Writer for {} while removing Index from the repository;" - + "this directory may need to be cleaned up manually.", e); + + "this directory may need to be cleaned up manually.", absoluteFile, e); } return true; } - private IndexWriterCount createWriter(final File indexDirectory) throws IOException { final List closeables = new ArrayList<>(); final Directory directory = FSDirectory.open(indexDirectory.toPath()); @@ -236,7 +270,7 @@ public class StandardIndexManager implements IndexManager { logger.trace("Borrowing index writer for {}", indexDirectory); IndexWriterCount writerCount; - synchronized (writerCounts) { + synchronized (countMutex) { writerCount = writerCounts.get(absoluteFile); if (writerCount == null) { @@ -272,7 +306,7 @@ public class StandardIndexManager implements IndexManager { IndexWriterCount count; boolean close = isCloseable; try { - synchronized (writerCounts) { + synchronized (countMutex) { count = writerCounts.get(absoluteFile); if (count != null && count.isCloseableWhenUnused()) { close = true; @@ -340,11 +374,17 @@ public class StandardIndexManager implements IndexManager { } protected int getWriterCount() { - synchronized (writerCounts) { + synchronized (countMutex) { return writerCounts.size(); } } + protected int getSearcherCount() { + synchronized (countMutex) { + return searcherCounts.size(); + } + } + private static void closeQuietly(final Closeable... closeables) { for (final Closeable closeable : closeables) { if (closeable == null) { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java index da14fc88d2..2f51636bdb 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/lucene/TestSimpleIndexManager.java @@ -26,15 +26,18 @@ import org.apache.nifi.provenance.RepositoryConfiguration; import org.apache.nifi.provenance.index.EventIndexSearcher; import org.apache.nifi.provenance.index.EventIndexWriter; import org.apache.nifi.util.file.FileUtils; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class TestSimpleIndexManager { @@ -43,6 +46,47 @@ public class TestSimpleIndexManager { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); } + @Test + public void testDeletingIndexWhileSearcherActive() throws IOException { + final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration()); + final File dir = new File("target/" + UUID.randomUUID().toString()); + try { + final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir); + final Document doc1 = new Document(); + doc1.add(new StringField("id", "1", Store.YES)); + writer1.index(doc1, 1); + + mgr.returnIndexWriter(writer1, true, true); + assertEquals(0, mgr.getWriterCount()); + + final EventIndexSearcher eventSearcher = mgr.borrowIndexSearcher(dir); + assertEquals(0, mgr.getWriterCount()); + assertEquals(1, mgr.getSearcherCount()); + + boolean removed = mgr.removeIndex(dir); + assertFalse(removed); + mgr.returnIndexSearcher(eventSearcher); + + assertEquals(0, mgr.getWriterCount()); + assertEquals(0, mgr.getSearcherCount()); + + FileUtils.deleteFile(dir, true); + assertFalse(dir.exists()); + + try { + mgr.borrowIndexSearcher(dir); + Assert.fail("Expected FileNotFoundException to be thrown"); + } catch (final FileNotFoundException fnfe) { + // expected + } + } finally { + if (dir.exists()) { + FileUtils.deleteFile(dir, true); + } + } + } + + @Test public void testMultipleWritersSimultaneouslySameIndex() throws IOException { final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration());