NIFI-8210: When Index Reader/Searcher is used, do not allow the Lucene Index to be deleted until the reader/searcher is finsihed being used and closed.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4812
This commit is contained in:
Mark Payne 2021-02-08 12:33:31 -05:00 committed by Matthew Burgess
parent b9b131239c
commit d5d520764d
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
3 changed files with 110 additions and 18 deletions

View File

@ -17,15 +17,16 @@
package org.apache.nifi.provenance.index.lucene; package org.apache.nifi.provenance.index.lucene;
import java.io.File;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.provenance.index.EventIndexSearcher; import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.apache.nifi.provenance.lucene.IndexManager; import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.util.DirectoryUtils; import org.apache.nifi.provenance.util.DirectoryUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.concurrent.TimeUnit;
public class LuceneCacheWarmer implements Runnable { public class LuceneCacheWarmer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(LuceneCacheWarmer.class); private static final Logger logger = LoggerFactory.getLogger(LuceneCacheWarmer.class);
@ -51,7 +52,14 @@ public class LuceneCacheWarmer implements Runnable {
for (final File indexDir : indexDirs) { for (final File indexDir : indexDirs) {
final long indexStartNanos = System.nanoTime(); final long indexStartNanos = System.nanoTime();
final EventIndexSearcher eventSearcher = indexManager.borrowIndexSearcher(indexDir); final EventIndexSearcher eventSearcher;
try {
eventSearcher = indexManager.borrowIndexSearcher(indexDir);
} catch (final FileNotFoundException fnfe) {
logger.debug("Cannot warm Lucene Index directory {} because the directory no longer exists", indexDir);
continue;
}
indexManager.returnIndexSearcher(eventSearcher); indexManager.returnIndexSearcher(eventSearcher);
final long indexWarmMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStartNanos); final long indexWarmMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - indexStartNanos);

View File

@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -49,7 +50,10 @@ import java.util.concurrent.TimeUnit;
public class StandardIndexManager implements IndexManager { public class StandardIndexManager implements IndexManager {
private static final Logger logger = LoggerFactory.getLogger(StandardIndexManager.class); private static final Logger logger = LoggerFactory.getLogger(StandardIndexManager.class);
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); // guarded by synchronizing on map itself private final Object countMutex = new Object();
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>(); // guarded by synchronizing on countMutex
private final Map<File, Integer> searcherCounts = new HashMap<>(); // guarded by synchronizing on countMutex
private final ExecutorService searchExecutor; private final ExecutorService searchExecutor;
private final RepositoryConfiguration repoConfig; private final RepositoryConfiguration repoConfig;
@ -72,7 +76,7 @@ public class StandardIndexManager implements IndexManager {
searchExecutor.shutdownNow(); searchExecutor.shutdownNow();
} }
synchronized (writerCounts) { synchronized (countMutex) {
final Set<File> closed = new HashSet<>(); final Set<File> closed = new HashSet<>();
for (final Map.Entry<File, IndexWriterCount> entry : writerCounts.entrySet()) { for (final Map.Entry<File, IndexWriterCount> entry : writerCounts.entrySet()) {
@ -92,18 +96,33 @@ public class StandardIndexManager implements IndexManager {
final File absoluteFile = indexDir.getAbsoluteFile(); final File absoluteFile = indexDir.getAbsoluteFile();
final IndexWriterCount writerCount; final IndexWriterCount writerCount;
synchronized (writerCounts) { synchronized (countMutex) {
writerCount = writerCounts.remove(absoluteFile); writerCount = writerCounts.remove(absoluteFile);
if (writerCount != null) { // If there is an Index Writer already, increment writer count and create an Index Searcher based on the writer. This gives our searcher
// Increment writer count and create an Index Searcher based on the writer // access to events that have been written by that writer and not necessarily yet committed to the index. Otherwise, we can just create
// an index searcher but must increment the number of Index Searchers we have active in order to avoid allowing the directory to be
// deleted while the Index Searcher is active.
if (writerCount == null) {
final Integer searcherCount = searcherCounts.remove(absoluteFile);
final int updatedSearcherCount = (searcherCount == null) ? 1 : searcherCount + 1;
searcherCounts.put(absoluteFile, updatedSearcherCount);
logger.debug("Index Searcher being borrowed for {}. No Active Writer so incrementing Searcher Count to {}", absoluteFile, updatedSearcherCount);
} else {
final int updatedWriterCount = writerCount.getCount() + 1;
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), writerCount.getAnalyzer(), writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), writerCount.getAnalyzer(),
writerCount.getDirectory(), writerCount.getCount() + 1, writerCount.isCloseableWhenUnused())); writerCount.getDirectory(), updatedWriterCount, writerCount.isCloseableWhenUnused()));
logger.debug("Index Searcher being borrowed for {}. An Active Writer exists so incrementing Writer Count to {}", absoluteFile, updatedWriterCount);
} }
} }
final DirectoryReader directoryReader; final DirectoryReader directoryReader;
if (writerCount == null) { if (writerCount == null) {
final boolean directoryExists = indexDir.exists();
if (!directoryExists) {
throw new FileNotFoundException("Cannot search Provenance Index Directory " + indexDir.getAbsolutePath() + " because the directory does not exist");
}
logger.trace("Creating index searcher for {}", indexDir); logger.trace("Creating index searcher for {}", indexDir);
final Directory directory = FSDirectory.open(indexDir.toPath()); final Directory directory = FSDirectory.open(indexDir.toPath());
directoryReader = DirectoryReader.open(directory); directoryReader = DirectoryReader.open(directory);
@ -127,11 +146,20 @@ public class StandardIndexManager implements IndexManager {
final IndexWriterCount count; final IndexWriterCount count;
boolean closeWriter = false; boolean closeWriter = false;
synchronized (writerCounts) { synchronized (countMutex) {
final File absoluteFile = searcher.getIndexDirectory().getAbsoluteFile(); final File absoluteFile = searcher.getIndexDirectory().getAbsoluteFile();
count = writerCounts.get(absoluteFile); count = writerCounts.get(absoluteFile);
if (count == null) { if (count == null) {
logger.debug("Returning EventIndexSearcher for {}; there is no active writer for this searcher so will not decrement writerCounts", absoluteFile); final Integer searcherCount = searcherCounts.remove(absoluteFile);
final int updatedSearcherCount = (searcherCount == null) ? 0 : searcherCount - 1;
if (updatedSearcherCount <= 0) {
searcherCounts.remove(absoluteFile);
} else {
searcherCounts.put(absoluteFile, updatedSearcherCount);
}
logger.debug("Returning EventIndexSearcher for {}; there is no active writer for this searcher so will not decrement writerCounts. Decrementing Searcher Count to {}",
absoluteFile, updatedSearcherCount);
return; return;
} }
@ -168,7 +196,13 @@ public class StandardIndexManager implements IndexManager {
logger.debug("Attempting to remove index {} from SimpleIndexManager", absoluteFile); logger.debug("Attempting to remove index {} from SimpleIndexManager", absoluteFile);
IndexWriterCount writerCount; IndexWriterCount writerCount;
synchronized (writerCounts) { synchronized (countMutex) {
final Integer numSearchers = searcherCounts.get(absoluteFile);
if (numSearchers != null && numSearchers > 0) {
logger.debug("Not allowing removal of index {} because the active searcher count for this directory is {}", absoluteFile, numSearchers);
return false;
}
writerCount = writerCounts.remove(absoluteFile); writerCount = writerCounts.remove(absoluteFile);
if (writerCount == null) { if (writerCount == null) {
logger.debug("Allowing removal of index {} because there is no IndexWriterCount for this directory", absoluteFile); logger.debug("Allowing removal of index {} because there is no IndexWriterCount for this directory", absoluteFile);
@ -183,18 +217,18 @@ public class StandardIndexManager implements IndexManager {
} }
try { try {
// A WriterCount exists and has a count of 0.
logger.debug("Removing index {} from SimpleIndexManager and closing the writer", absoluteFile); logger.debug("Removing index {} from SimpleIndexManager and closing the writer", absoluteFile);
close(writerCount); close(writerCount);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to close Index Writer for {} while removing Index from the repository;" logger.error("Failed to close Index Writer for {} while removing Index from the repository;"
+ "this directory may need to be cleaned up manually.", e); + "this directory may need to be cleaned up manually.", absoluteFile, e);
} }
return true; return true;
} }
private IndexWriterCount createWriter(final File indexDirectory) throws IOException { private IndexWriterCount createWriter(final File indexDirectory) throws IOException {
final List<Closeable> closeables = new ArrayList<>(); final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexDirectory.toPath()); final Directory directory = FSDirectory.open(indexDirectory.toPath());
@ -236,7 +270,7 @@ public class StandardIndexManager implements IndexManager {
logger.trace("Borrowing index writer for {}", indexDirectory); logger.trace("Borrowing index writer for {}", indexDirectory);
IndexWriterCount writerCount; IndexWriterCount writerCount;
synchronized (writerCounts) { synchronized (countMutex) {
writerCount = writerCounts.get(absoluteFile); writerCount = writerCounts.get(absoluteFile);
if (writerCount == null) { if (writerCount == null) {
@ -272,7 +306,7 @@ public class StandardIndexManager implements IndexManager {
IndexWriterCount count; IndexWriterCount count;
boolean close = isCloseable; boolean close = isCloseable;
try { try {
synchronized (writerCounts) { synchronized (countMutex) {
count = writerCounts.get(absoluteFile); count = writerCounts.get(absoluteFile);
if (count != null && count.isCloseableWhenUnused()) { if (count != null && count.isCloseableWhenUnused()) {
close = true; close = true;
@ -340,11 +374,17 @@ public class StandardIndexManager implements IndexManager {
} }
protected int getWriterCount() { protected int getWriterCount() {
synchronized (writerCounts) { synchronized (countMutex) {
return writerCounts.size(); return writerCounts.size();
} }
} }
protected int getSearcherCount() {
synchronized (countMutex) {
return searcherCounts.size();
}
}
private static void closeQuietly(final Closeable... closeables) { private static void closeQuietly(final Closeable... closeables) {
for (final Closeable closeable : closeables) { for (final Closeable closeable : closeables) {
if (closeable == null) { if (closeable == null) {

View File

@ -26,15 +26,18 @@ import org.apache.nifi.provenance.RepositoryConfiguration;
import org.apache.nifi.provenance.index.EventIndexSearcher; import org.apache.nifi.provenance.index.EventIndexSearcher;
import org.apache.nifi.provenance.index.EventIndexWriter; import org.apache.nifi.provenance.index.EventIndexWriter;
import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class TestSimpleIndexManager { public class TestSimpleIndexManager {
@ -43,6 +46,47 @@ public class TestSimpleIndexManager {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
} }
@Test
public void testDeletingIndexWhileSearcherActive() throws IOException {
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration());
final File dir = new File("target/" + UUID.randomUUID().toString());
try {
final EventIndexWriter writer1 = mgr.borrowIndexWriter(dir);
final Document doc1 = new Document();
doc1.add(new StringField("id", "1", Store.YES));
writer1.index(doc1, 1);
mgr.returnIndexWriter(writer1, true, true);
assertEquals(0, mgr.getWriterCount());
final EventIndexSearcher eventSearcher = mgr.borrowIndexSearcher(dir);
assertEquals(0, mgr.getWriterCount());
assertEquals(1, mgr.getSearcherCount());
boolean removed = mgr.removeIndex(dir);
assertFalse(removed);
mgr.returnIndexSearcher(eventSearcher);
assertEquals(0, mgr.getWriterCount());
assertEquals(0, mgr.getSearcherCount());
FileUtils.deleteFile(dir, true);
assertFalse(dir.exists());
try {
mgr.borrowIndexSearcher(dir);
Assert.fail("Expected FileNotFoundException to be thrown");
} catch (final FileNotFoundException fnfe) {
// expected
}
} finally {
if (dir.exists()) {
FileUtils.deleteFile(dir, true);
}
}
}
@Test @Test
public void testMultipleWritersSimultaneouslySameIndex() throws IOException { public void testMultipleWritersSimultaneouslySameIndex() throws IOException {
final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration()); final StandardIndexManager mgr = new StandardIndexManager(new RepositoryConfiguration());