From b760505bf388848d5329a658bb429d4ef34afcf7 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 6 May 2015 15:43:20 -0400 Subject: [PATCH] NIFI-596: If IndexWriter is opened for same directory as an IndexReader, mark IndexReader as poisoned and stop using it NIFI-595: Delete .toc files when expiring an event file NIFI-597: Only increment counter for number of documents retrieved after reading the record --- .../expiration/FileRemovalAction.java | 28 +++++++-- .../nifi/provenance/lucene/DocsReader.java | 25 +++++--- .../nifi/provenance/lucene/IndexManager.java | 61 ++++++++++++++++--- 3 files changed, 92 insertions(+), 22 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java index 1b4bafe5a8..23330796de 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/expiration/FileRemovalAction.java @@ -20,7 +20,7 @@ import java.io.File; import java.io.IOException; import org.apache.nifi.provenance.lucene.DeleteIndexAction; - +import org.apache.nifi.provenance.toc.TocUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,16 +30,32 @@ public class FileRemovalAction implements ExpirationAction { @Override public File execute(final File expiredFile) throws IOException { + final boolean removed = remove(expiredFile); + if (removed) { + logger.info("Removed expired Provenance Event file {}", expiredFile); + } else { + logger.warn("Failed to remove old Provenance Event file {}; this file should be cleaned up manually", expiredFile); + } + + final File tocFile = TocUtil.getTocFile(expiredFile); + if (remove(tocFile)) { + logger.info("Removed expired Provenance Table-of-Contents file {}", tocFile); + } else { + logger.warn("Failed to remove old Provenance Table-of-Contents file {}; this file should be cleaned up manually", expiredFile); + } + + return removed ? null : expiredFile; + } + + private boolean remove(final File file) { boolean removed = false; for (int i = 0; i < 10 && !removed; i++) { - if ((removed = expiredFile.delete())) { - logger.info("Removed expired Provenance Event file {}", expiredFile); - return null; + if (removed = file.delete()) { + return true; } } - logger.warn("Failed to remove old Provenance Event file {}", expiredFile); - return expiredFile; + return false; } @Override diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java index 98137fbd41..02fd5c3e28 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/DocsReader.java @@ -64,14 +64,11 @@ public class DocsReader { final int docId = scoreDoc.doc; final Document d = indexReader.document(docId); docs.add(d); - if ( retrievalCount.incrementAndGet() >= maxResults ) { - break; - } } final long readDocuments = System.nanoTime() - start; logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments)); - return read(docs, allProvenanceLogFiles); + return read(docs, allProvenanceLogFiles, retrievalCount, maxResults); } @@ -88,7 +85,7 @@ public class DocsReader { private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException { - IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); + final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX); if ( blockField == null ) { reader.skipTo(getByteOffset(d, reader)); } else { @@ -97,7 +94,7 @@ public class DocsReader { StandardProvenanceEventRecord record; while ( (record = reader.nextRecord()) != null) { - IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName()); + final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName()); if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) { break; } @@ -111,7 +108,11 @@ public class DocsReader { } - public Set read(final List docs, final Collection allProvenanceLogFiles) throws IOException { + public Set read(final List docs, final Collection allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException { + if (retrievalCount.get() >= maxResults) { + return Collections.emptySet(); + } + LuceneUtil.sortDocsForRetrieval(docs); RecordReader reader = null; @@ -133,6 +134,10 @@ public class DocsReader { try { if (reader != null && storageFilename.equals(lastStorageFilename)) { matchingRecords.add(getRecord(d, reader)); + + if ( retrievalCount.incrementAndGet() >= maxResults ) { + break; + } } else { logger.debug("Opening log file {}", storageFilename); @@ -141,7 +146,7 @@ public class DocsReader { reader.close(); } - List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); + final List potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles); if (potentialFiles.isEmpty()) { logger.warn("Could not find Provenance Log File with basename {} in the " + "Provenance Repository; assuming file has expired and continuing without it", storageFilename); @@ -158,6 +163,10 @@ public class DocsReader { try { reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles); matchingRecords.add(getRecord(d, reader)); + + if ( retrievalCount.incrementAndGet() >= maxResults ) { + break; + } } catch (final IOException e) { throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e); } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java index 9c3ec31633..31f31a55da 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexManager.java @@ -119,6 +119,14 @@ public class IndexManager implements Closeable { } writerCounts.put(absoluteFile, writerCount); + + // Mark any active searchers as poisoned because we are updating the index + final List searchers = activeSearchers.get(absoluteFile); + if ( searchers != null ) { + for (final ActiveIndexSearcher activeSearcher : searchers) { + activeSearcher.poison(); + } + } } else { logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1); writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(), @@ -137,7 +145,7 @@ public class IndexManager implements Closeable { lock.lock(); try { - IndexWriterCount count = writerCounts.remove(absoluteFile); + final IndexWriterCount count = writerCounts.remove(absoluteFile); try { if ( count == null ) { @@ -184,6 +192,15 @@ public class IndexManager implements Closeable { try { for ( final ActiveIndexSearcher searcher : currentlyCached ) { if ( searcher.isCache() ) { + // if the searcher is poisoned, we want to close and expire it. + if ( searcher.isPoisoned() ) { + logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile); + expired.add(searcher); + continue; + } + + // if there are no references to the reader, it will have been closed. Since there is no + // isClosed() method, this is how we determine whether it's been closed or not. final int refCount = searcher.getSearcher().getIndexReader().getRefCount(); if ( refCount <= 0 ) { // if refCount == 0, then the reader has been closed, so we need to discard the searcher @@ -212,7 +229,7 @@ public class IndexManager implements Closeable { } } - IndexWriterCount writerCount = writerCounts.remove(absoluteFile); + final IndexWriterCount writerCount = writerCounts.remove(absoluteFile); if ( writerCount == null ) { final Directory directory = FSDirectory.open(absoluteFile); logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir); @@ -270,21 +287,40 @@ public class IndexManager implements Closeable { lock.lock(); try { // check if we already have a reader cached. - List currentlyCached = activeSearchers.get(absoluteFile); + final List currentlyCached = activeSearchers.get(absoluteFile); if ( currentlyCached == null ) { logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could " + "result in a resource leak", indexDirectory); return; } + // Check if the given searcher is in our list. We use an Iterator to do this so that if we + // find it we can call remove() on the iterator if need be. final Iterator itr = currentlyCached.iterator(); while (itr.hasNext()) { final ActiveIndexSearcher activeSearcher = itr.next(); if ( activeSearcher.getSearcher().equals(searcher) ) { if ( activeSearcher.isCache() ) { - // the searcher is cached. Just leave it open. - logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); - return; + // if the searcher is poisoned, close it and remove from "pool". + if ( activeSearcher.isPoisoned() ) { + itr.remove(); + + try { + logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory); + activeSearcher.close(); + } catch (final IOException ioe) { + logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe); + if ( logger.isDebugEnabled() ) { + logger.warn("", ioe); + } + } + + return; + } else { + // the searcher is cached. Just leave it open. + logger.debug("Index searcher for {} is cached; leaving open", indexDirectory); + return; + } } else { // searcher is not cached. It was created from a writer, and we want // the newest updates the next time that we get a searcher, so we will @@ -405,9 +441,10 @@ public class IndexManager implements Closeable { private final DirectoryReader directoryReader; private final Directory directory; private final boolean cache; + private boolean poisoned = false; - public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader, - Directory directory, final boolean cache) { + public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader, + final Directory directory, final boolean cache) { this.searcher = searcher; this.directoryReader = directoryReader; this.directory = directory; @@ -422,6 +459,14 @@ public class IndexManager implements Closeable { return searcher; } + public boolean isPoisoned() { + return poisoned; + } + + public void poison() { + this.poisoned = true; + } + @Override public void close() throws IOException { IndexManager.close(directoryReader, directory);