NIFI-2681: Refactored IndexManager into an interface and renamed the existing implementation to CachingIndexManager. Implemented a new SimpleIndexManager that performs no caching of IndexSearchers.

This closes #958.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2016-08-26 20:03:16 -04:00 committed by Bryan Bende
parent a9d029d74e
commit 088125451b
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
7 changed files with 932 additions and 607 deletions

View File

@ -16,58 +16,6 @@
*/
package org.apache.nifi.provenance;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.expiration.ExpirationAction;
import org.apache.nifi.provenance.expiration.FileRemovalAction;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.lineage.FlowFileLineage;
import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageComputationType;
import org.apache.nifi.provenance.lucene.DeleteIndexAction;
import org.apache.nifi.provenance.lucene.FieldNames;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.IndexSearch;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.lucene.LineageQuery;
import org.apache.nifi.provenance.lucene.LuceneUtil;
import org.apache.nifi.provenance.lucene.UpdateMinimumEventId;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.serialization.RecordWriters;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.web.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
@ -113,6 +61,59 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.FSDirectory;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.provenance.expiration.ExpirationAction;
import org.apache.nifi.provenance.expiration.FileRemovalAction;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.lineage.FlowFileLineage;
import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageComputationType;
import org.apache.nifi.provenance.lucene.DeleteIndexAction;
import org.apache.nifi.provenance.lucene.FieldNames;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.IndexSearch;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.lucene.LineageQuery;
import org.apache.nifi.provenance.lucene.LuceneUtil;
import org.apache.nifi.provenance.lucene.SimpleIndexManager;
import org.apache.nifi.provenance.lucene.UpdateMinimumEventId;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.serialization.RecordWriters;
import org.apache.nifi.provenance.toc.TocReader;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.ForEachEvaluator;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.Tuple;
import org.apache.nifi.web.ResourceNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PersistentProvenanceRepository implements ProvenanceRepository {
public static final String EVENT_CATEGORY = "Provenance Repository";
@ -226,7 +227,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
this.maxPartitionMillis = configuration.getMaxEventFileLife(TimeUnit.MILLISECONDS);
this.maxPartitionBytes = configuration.getMaxEventFileCapacity();
this.indexConfig = new IndexConfiguration(configuration);
this.indexManager = new IndexManager();
this.indexManager = new SimpleIndexManager();
this.alwaysSync = configuration.isAlwaysSync();
this.rolloverCheckMillis = rolloverCheckMillis;
@ -1303,57 +1304,61 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
final Runnable rolloverRunnable = new Runnable() {
@Override
public void run() {
File fileRolledOver = null;
try {
fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
} catch (final IOException ioe) {
logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
logger.error("", ioe);
}
if (fileRolledOver != null) {
final File file = fileRolledOver;
// update our map of id to Path
// We need to make sure that another thread doesn't also update the map at the same time. We cannot
// use the write lock when purging old events, and we want to use the same approach here.
boolean updated = false;
final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
while (!updated) {
final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
newIdToPathMap.putAll(existingPathMap);
newIdToPathMap.put(fileFirstEventId, file.toPath());
updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
try {
fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
} catch (final IOException ioe) {
logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
logger.error("", ioe);
}
logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
}
if (fileRolledOver != null) {
//if files were rolled over or if out of retries stop the future
if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
final File file = fileRolledOver;
if (fileRolledOver == null && retryAttempts.get() == 0) {
logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
}
rolloverCompletions.getAndIncrement();
// Cancel the future so that we don't run anymore
Future<?> future;
while ((future = futureReference.get()) == null) {
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {
// update our map of id to Path
// We need to make sure that another thread doesn't also update the map at the same time. We cannot
// use the write lock when purging old events, and we want to use the same approach here.
boolean updated = false;
final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
while (!updated) {
final SortedMap<Long, Path> existingPathMap = idToPathMap.get();
final SortedMap<Long, Path> newIdToPathMap = new TreeMap<>(new PathMapComparator());
newIdToPathMap.putAll(existingPathMap);
newIdToPathMap.put(fileFirstEventId, file.toPath());
updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
}
}
future.cancel(false);
} else {
logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
}
//if files were rolled over or if out of retries stop the future
if (fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
if (fileRolledOver == null && retryAttempts.get() == 0) {
logger.error("Failed to merge Journal Files {} after {} attempts. ", journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
}
rolloverCompletions.getAndIncrement();
// Cancel the future so that we don't run anymore
Future<?> future;
while ((future = futureReference.get()) == null) {
try {
Thread.sleep(10L);
} catch (final InterruptedException ie) {
}
}
future.cancel(false);
} else {
logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
}
} catch (final Exception e) {
logger.error("Failed to merge journals. Will try again. journalsToMerge: {}, storageDir: {}, cause: {}", journalsToMerge, storageDir, e.toString());
logger.error("", e);
}
}
};

View File

@ -0,0 +1,536 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.lucene;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CachingIndexManager implements Closeable, IndexManager {
private static final Logger logger = LoggerFactory.getLogger(CachingIndexManager.class);
private final Lock lock = new ReentrantLock();
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
public void removeIndex(final File indexDirectory) {
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.info("Removing index {}", indexDirectory);
lock.lock();
try {
final IndexWriterCount count = writerCounts.remove(absoluteFile);
if ( count != null ) {
try {
count.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
}
final List<ActiveIndexSearcher> searcherList = activeSearchers.remove(absoluteFile);
if (searcherList != null) {
for ( final ActiveIndexSearcher searcher : searcherList ) {
try {
searcher.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher {} for {} due to {}",
searcher.getSearcher(), absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
}
}
} finally {
lock.unlock();
}
}
public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
logger.trace("Borrowing index writer for {}", indexingDirectory);
lock.lock();
try {
IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if ( writerCount == null ) {
final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexingDirectory);
closeables.add(directory);
try {
final Analyzer analyzer = new StandardAnalyzer();
closeables.add(analyzer);
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
config.setWriteLockTimeout(300000L);
final IndexWriter indexWriter = new IndexWriter(directory, config);
writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
logger.debug("Providing new index writer for {}", indexingDirectory);
} catch (final IOException ioe) {
for ( final Closeable closeable : closeables ) {
try {
closeable.close();
} catch (final IOException ioe2) {
ioe.addSuppressed(ioe2);
}
}
throw ioe;
}
writerCounts.put(absoluteFile, writerCount);
// Mark any active searchers as poisoned because we are updating the index
final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
if ( searchers != null ) {
for (final ActiveIndexSearcher activeSearcher : searchers) {
logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
activeSearcher.poison();
}
}
} else {
logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
}
return writerCount.getWriter();
} finally {
lock.unlock();
}
}
public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
lock.lock();
try {
final IndexWriterCount count = writerCounts.remove(absoluteFile);
try {
if ( count == null ) {
logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ "This could potentially lead to a resource leak", writer, indexingDirectory);
writer.close();
} else if ( count.getCount() <= 1 ) {
// we are finished with this writer.
logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
count.close();
} else {
// decrement the count.
logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
} finally {
lock.unlock();
}
}
public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
final File absoluteFile = indexDir.getAbsoluteFile();
logger.trace("Borrowing index searcher for {}", indexDir);
lock.lock();
try {
// check if we already have a reader cached.
List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
if ( currentlyCached == null ) {
currentlyCached = new ArrayList<>();
activeSearchers.put(absoluteFile, currentlyCached);
} else {
// keep track of any searchers that have been closed so that we can remove them
// from our cache later.
for (final ActiveIndexSearcher searcher : currentlyCached) {
if (searcher.isCache()) {
// if the searcher is poisoned, we want to close and expire it.
if (searcher.isPoisoned()) {
continue;
}
// if there are no references to the reader, it will have been closed. Since there is no
// isClosed() method, this is how we determine whether it's been closed or not.
final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
if (refCount <= 0) {
// if refCount == 0, then the reader has been closed, so we cannot use the searcher
logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ "removing cached searcher", absoluteFile, refCount);
continue;
}
final int referenceCount = searcher.incrementReferenceCount();
logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
return searcher.getSearcher();
}
}
}
// We found no cached Index Readers. Create a new one. To do this, we need to check
// if we have an Index Writer, and if so create a Reader based on the Index Writer.
// This will provide us a 'near real time' index reader.
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if ( writerCount == null ) {
final Directory directory = FSDirectory.open(absoluteFile);
logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
try {
final DirectoryReader directoryReader = DirectoryReader.open(directory);
final IndexSearcher searcher = new IndexSearcher(directoryReader);
// we want to cache the searcher that we create, since it's just a reader.
final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
currentlyCached.add(cached);
return cached.getSearcher();
} catch (final IOException e) {
logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
logger.error("", e);
try {
directory.close();
} catch (final IOException ioe) {
e.addSuppressed(ioe);
}
throw e;
}
} else {
logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+ "counter to {}", indexDir, writerCount.getCount() + 1);
// increment the writer count to ensure that it's kept open.
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
// create a new Index Searcher from the writer so that we don't have an issue with trying
// to read from a directory that's locked. If we get the "no segments* file found" with
// Lucene, this indicates that an IndexWriter already has the directory open.
final IndexWriter writer = writerCount.getWriter();
final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
final IndexSearcher searcher = new IndexSearcher(directoryReader);
// we don't want to cache this searcher because it's based on a writer, so we want to get
// new values the next time that we search.
final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
currentlyCached.add(activeSearcher);
return activeSearcher.getSearcher();
}
} finally {
lock.unlock();
}
}
public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
lock.lock();
try {
// check if we already have a reader cached.
final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
if ( currentlyCached == null ) {
logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ "result in a resource leak", indexDirectory);
return;
}
// Check if the given searcher is in our list. We use an Iterator to do this so that if we
// find it we can call remove() on the iterator if need be.
final Iterator<ActiveIndexSearcher> itr = new ArrayList<>(currentlyCached).iterator();
boolean activeSearcherFound = false;
while (itr.hasNext()) {
final ActiveIndexSearcher activeSearcher = itr.next();
if ( activeSearcher.getSearcher().equals(searcher) ) {
activeSearcherFound = true;
if ( activeSearcher.isCache() ) {
// if the searcher is poisoned, close it and remove from "pool". Otherwise,
// just decrement the count. Note here that when we call close() it won't actually close
// the underlying directory reader unless there are no more references to it
if ( activeSearcher.isPoisoned() ) {
itr.remove();
try {
activeSearcher.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
return;
} else {
// the searcher is cached. Just leave it open.
final int refCount = activeSearcher.decrementReferenceCount();
logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
return;
}
} else {
// searcher is not cached. It was created from a writer, and we want
// the newest updates the next time that we get a searcher, so we will
// go ahead and close this one out.
itr.remove();
// decrement the writer count because we incremented it when creating the searcher
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if ( writerCount != null ) {
if ( writerCount.getCount() <= 1 ) {
try {
logger.debug("Index searcher for {} is not cached. Writer count is "
+ "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
writerCount.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
} else {
logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+ "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(),
writerCount.getCount() - 1));
}
}
try {
logger.debug("Closing Index Searcher for {}", indexDirectory);
final boolean allReferencesClosed = activeSearcher.close();
if (!allReferencesClosed) {
currentlyCached.add(activeSearcher);
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
}
}
}
if (!activeSearcherFound) {
logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. "
+ "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory);
}
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
logger.debug("Closing Index Manager");
lock.lock();
try {
IOException ioe = null;
for ( final IndexWriterCount count : writerCounts.values() ) {
try {
count.close();
} catch (final IOException e) {
if ( ioe == null ) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
for (final ActiveIndexSearcher searcher : searcherList) {
try {
searcher.close();
} catch (final IOException e) {
if ( ioe == null ) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
}
if ( ioe != null ) {
throw ioe;
}
} finally {
lock.unlock();
}
}
private static void close(final Closeable... closeables) throws IOException {
IOException ioe = null;
for ( final Closeable closeable : closeables ) {
if ( closeable == null ) {
continue;
}
try {
closeable.close();
} catch (final IOException e) {
if ( ioe == null ) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if ( ioe != null ) {
throw ioe;
}
}
private static class ActiveIndexSearcher {
private final IndexSearcher searcher;
private final DirectoryReader directoryReader;
private final File indexDirectory;
private final Directory directory;
private final boolean cache;
private final AtomicInteger referenceCount = new AtomicInteger(1);
private volatile boolean poisoned = false;
public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
final Directory directory, final boolean cache) {
this.searcher = searcher;
this.directoryReader = directoryReader;
this.indexDirectory = indexDirectory;
this.directory = directory;
this.cache = cache;
}
public boolean isCache() {
return cache;
}
public IndexSearcher getSearcher() {
return searcher;
}
public boolean isPoisoned() {
return poisoned;
}
public void poison() {
this.poisoned = true;
}
public int incrementReferenceCount() {
return referenceCount.incrementAndGet();
}
public int decrementReferenceCount() {
return referenceCount.decrementAndGet();
}
public boolean close() throws IOException {
final int updatedRefCount = referenceCount.decrementAndGet();
if (updatedRefCount <= 0) {
logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
CachingIndexManager.close(directoryReader, directory);
return true;
} else {
logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
return false;
}
}
@Override
public String toString() {
return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
}
}
private static class IndexWriterCount implements Closeable {
private final IndexWriter writer;
private final Analyzer analyzer;
private final Directory directory;
private final int count;
public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
this.writer = writer;
this.analyzer = analyzer;
this.directory = directory;
this.count = count;
}
public Analyzer getAnalyzer() {
return analyzer;
}
public Directory getDirectory() {
return directory;
}
public IndexWriter getWriter() {
return writer;
}
public int getCount() {
return count;
}
@Override
public void close() throws IOException {
CachingIndexManager.close(writer, analyzer, directory);
}
}
}

View File

@ -14,522 +14,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.lucene;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexManager implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(IndexManager.class);
public interface IndexManager extends Closeable {
IndexSearcher borrowIndexSearcher(File indexDir) throws IOException;
private final Lock lock = new ReentrantLock();
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
private final Map<File, List<ActiveIndexSearcher>> activeSearchers = new HashMap<>();
IndexWriter borrowIndexWriter(File indexingDirectory) throws IOException;
void removeIndex(final File indexDirectory);
public void removeIndex(final File indexDirectory) {
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.info("Removing index {}", indexDirectory);
void returnIndexSearcher(File indexDirectory, IndexSearcher searcher);
lock.lock();
try {
final IndexWriterCount count = writerCounts.remove(absoluteFile);
if ( count != null ) {
try {
count.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer {} for {}", count.getWriter(), absoluteFile);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
}
for ( final List<ActiveIndexSearcher> searcherList : activeSearchers.values() ) {
for ( final ActiveIndexSearcher searcher : searcherList ) {
try {
searcher.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher {} for {} due to {}",
searcher.getSearcher(), absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
}
}
} finally {
lock.unlock();
}
}
public IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
logger.trace("Borrowing index writer for {}", indexingDirectory);
lock.lock();
try {
IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if ( writerCount == null ) {
final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexingDirectory);
closeables.add(directory);
try {
final Analyzer analyzer = new StandardAnalyzer();
closeables.add(analyzer);
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
config.setWriteLockTimeout(300000L);
final IndexWriter indexWriter = new IndexWriter(directory, config);
writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
logger.debug("Providing new index writer for {}", indexingDirectory);
} catch (final IOException ioe) {
for ( final Closeable closeable : closeables ) {
try {
closeable.close();
} catch (final IOException ioe2) {
ioe.addSuppressed(ioe2);
}
}
throw ioe;
}
writerCounts.put(absoluteFile, writerCount);
// Mark any active searchers as poisoned because we are updating the index
final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
if ( searchers != null ) {
for (final ActiveIndexSearcher activeSearcher : searchers) {
logger.debug("Poisoning {} because it is searching {}, which is getting updated", activeSearcher, indexingDirectory);
activeSearcher.poison();
}
}
} else {
logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
}
return writerCount.getWriter();
} finally {
lock.unlock();
}
}
public void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
lock.lock();
try {
final IndexWriterCount count = writerCounts.remove(absoluteFile);
try {
if ( count == null ) {
logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ "This could potentially lead to a resource leak", writer, indexingDirectory);
writer.close();
} else if ( count.getCount() <= 1 ) {
// we are finished with this writer.
logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
count.close();
} else {
// decrement the count.
logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
} finally {
lock.unlock();
}
}
public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
final File absoluteFile = indexDir.getAbsoluteFile();
logger.trace("Borrowing index searcher for {}", indexDir);
lock.lock();
try {
// check if we already have a reader cached.
List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
if ( currentlyCached == null ) {
currentlyCached = new ArrayList<>();
activeSearchers.put(absoluteFile, currentlyCached);
} else {
// keep track of any searchers that have been closed so that we can remove them
// from our cache later.
for (final ActiveIndexSearcher searcher : currentlyCached) {
if (searcher.isCache()) {
// if the searcher is poisoned, we want to close and expire it.
if (searcher.isPoisoned()) {
continue;
}
// if there are no references to the reader, it will have been closed. Since there is no
// isClosed() method, this is how we determine whether it's been closed or not.
final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
if (refCount <= 0) {
// if refCount == 0, then the reader has been closed, so we cannot use the searcher
logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ "removing cached searcher", absoluteFile, refCount);
continue;
}
final int referenceCount = searcher.incrementReferenceCount();
logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
return searcher.getSearcher();
}
}
}
// We found no cached Index Readers. Create a new one. To do this, we need to check
// if we have an Index Writer, and if so create a Reader based on the Index Writer.
// This will provide us a 'near real time' index reader.
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if ( writerCount == null ) {
final Directory directory = FSDirectory.open(absoluteFile);
logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
try {
final DirectoryReader directoryReader = DirectoryReader.open(directory);
final IndexSearcher searcher = new IndexSearcher(directoryReader);
// we want to cache the searcher that we create, since it's just a reader.
final ActiveIndexSearcher cached = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, directory, true);
currentlyCached.add(cached);
return cached.getSearcher();
} catch (final IOException e) {
logger.error("Failed to create Index Searcher for {} due to {}", absoluteFile, e.toString());
logger.error("", e);
try {
directory.close();
} catch (final IOException ioe) {
e.addSuppressed(ioe);
}
throw e;
}
} else {
logger.debug("Index Writer currently exists for {}; creating a non-cachable reader and incrementing "
+ "counter to {}", indexDir, writerCount.getCount() + 1);
// increment the writer count to ensure that it's kept open.
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
// create a new Index Searcher from the writer so that we don't have an issue with trying
// to read from a directory that's locked. If we get the "no segments* file found" with
// Lucene, this indicates that an IndexWriter already has the directory open.
final IndexWriter writer = writerCount.getWriter();
final DirectoryReader directoryReader = DirectoryReader.open(writer, false);
final IndexSearcher searcher = new IndexSearcher(directoryReader);
// we don't want to cache this searcher because it's based on a writer, so we want to get
// new values the next time that we search.
final ActiveIndexSearcher activeSearcher = new ActiveIndexSearcher(searcher, absoluteFile, directoryReader, null, false);
currentlyCached.add(activeSearcher);
return activeSearcher.getSearcher();
}
} finally {
lock.unlock();
}
}
public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
final File absoluteFile = indexDirectory.getAbsoluteFile();
logger.trace("Returning index searcher for {} to IndexManager", indexDirectory);
lock.lock();
try {
// check if we already have a reader cached.
final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
if ( currentlyCached == null ) {
logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ "result in a resource leak", indexDirectory);
return;
}
// Check if the given searcher is in our list. We use an Iterator to do this so that if we
// find it we can call remove() on the iterator if need be.
final Iterator<ActiveIndexSearcher> itr = new ArrayList<>(currentlyCached).iterator();
boolean activeSearcherFound = false;
while (itr.hasNext()) {
final ActiveIndexSearcher activeSearcher = itr.next();
if ( activeSearcher.getSearcher().equals(searcher) ) {
activeSearcherFound = true;
if ( activeSearcher.isCache() ) {
// if the searcher is poisoned, close it and remove from "pool". Otherwise,
// just decrement the count. Note here that when we call close() it won't actually close
// the underlying directory reader unless there are no more references to it
if ( activeSearcher.isPoisoned() ) {
itr.remove();
try {
activeSearcher.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
return;
} else {
// the searcher is cached. Just leave it open.
final int refCount = activeSearcher.decrementReferenceCount();
logger.debug("Index searcher for {} is cached; leaving open with reference count of {}", indexDirectory, refCount);
return;
}
} else {
// searcher is not cached. It was created from a writer, and we want
// the newest updates the next time that we get a searcher, so we will
// go ahead and close this one out.
itr.remove();
// decrement the writer count because we incremented it when creating the searcher
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if ( writerCount != null ) {
if ( writerCount.getCount() <= 1 ) {
try {
logger.debug("Index searcher for {} is not cached. Writer count is "
+ "decremented to {}; closing writer", indexDirectory, writerCount.getCount() - 1);
writerCount.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
} else {
logger.debug("Index searcher for {} is not cached. Writer count is decremented "
+ "to {}; leaving writer open", indexDirectory, writerCount.getCount() - 1);
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(),
writerCount.getCount() - 1));
}
}
try {
logger.debug("Closing Index Searcher for {}", indexDirectory);
final boolean allReferencesClosed = activeSearcher.close();
if (!allReferencesClosed) {
currentlyCached.add(activeSearcher);
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
}
}
}
if (!activeSearcherFound) {
logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. "
+ "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory);
}
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
logger.debug("Closing Index Manager");
lock.lock();
try {
IOException ioe = null;
for ( final IndexWriterCount count : writerCounts.values() ) {
try {
count.close();
} catch (final IOException e) {
if ( ioe == null ) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
for (final List<ActiveIndexSearcher> searcherList : activeSearchers.values()) {
for (final ActiveIndexSearcher searcher : searcherList) {
try {
searcher.close();
} catch (final IOException e) {
if ( ioe == null ) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
}
if ( ioe != null ) {
throw ioe;
}
} finally {
lock.unlock();
}
}
private static void close(final Closeable... closeables) throws IOException {
IOException ioe = null;
for ( final Closeable closeable : closeables ) {
if ( closeable == null ) {
continue;
}
try {
closeable.close();
} catch (final IOException e) {
if ( ioe == null ) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if ( ioe != null ) {
throw ioe;
}
}
private static class ActiveIndexSearcher {
private final IndexSearcher searcher;
private final DirectoryReader directoryReader;
private final File indexDirectory;
private final Directory directory;
private final boolean cache;
private final AtomicInteger referenceCount = new AtomicInteger(1);
private volatile boolean poisoned = false;
public ActiveIndexSearcher(final IndexSearcher searcher, final File indexDirectory, final DirectoryReader directoryReader,
final Directory directory, final boolean cache) {
this.searcher = searcher;
this.directoryReader = directoryReader;
this.indexDirectory = indexDirectory;
this.directory = directory;
this.cache = cache;
}
public boolean isCache() {
return cache;
}
public IndexSearcher getSearcher() {
return searcher;
}
public boolean isPoisoned() {
return poisoned;
}
public void poison() {
this.poisoned = true;
}
public int incrementReferenceCount() {
return referenceCount.incrementAndGet();
}
public int decrementReferenceCount() {
return referenceCount.decrementAndGet();
}
public boolean close() throws IOException {
final int updatedRefCount = referenceCount.decrementAndGet();
if (updatedRefCount <= 0) {
logger.debug("Decremented Reference Count for {} to {}; closing underlying directory reader", this, updatedRefCount);
IndexManager.close(directoryReader, directory);
return true;
} else {
logger.debug("Decremented Reference Count for {} to {}; leaving underlying directory reader open", this, updatedRefCount);
return false;
}
}
@Override
public String toString() {
return "ActiveIndexSearcher[directory=" + indexDirectory + ", cached=" + cache + ", poisoned=" + poisoned + "]";
}
}
private static class IndexWriterCount implements Closeable {
private final IndexWriter writer;
private final Analyzer analyzer;
private final Directory directory;
private final int count;
public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
this.writer = writer;
this.analyzer = analyzer;
this.directory = directory;
this.count = count;
}
public Analyzer getAnalyzer() {
return analyzer;
}
public Directory getDirectory() {
return directory;
}
public IndexWriter getWriter() {
return writer;
}
public int getCount() {
return count;
}
@Override
public void close() throws IOException {
IndexManager.close(writer, analyzer, directory);
}
}
}
void returnIndexWriter(File indexingDirectory, IndexWriter writer);
}

View File

@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.lucene;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SimpleIndexManager implements IndexManager {
private static final Logger logger = LoggerFactory.getLogger(SimpleIndexManager.class);
private final ConcurrentMap<Object, List<Closeable>> closeables = new ConcurrentHashMap<>();
private final Map<File, IndexWriterCount> writerCounts = new HashMap<>();
@Override
public void close() throws IOException {
}
@Override
public IndexSearcher borrowIndexSearcher(final File indexDir) throws IOException {
logger.debug("Creating index searcher for {}", indexDir);
final Directory directory = FSDirectory.open(indexDir);
final DirectoryReader directoryReader = DirectoryReader.open(directory);
final IndexSearcher searcher = new IndexSearcher(directoryReader);
final List<Closeable> closeableList = new ArrayList<>(2);
closeableList.add(directoryReader);
closeableList.add(directory);
closeables.put(searcher, closeableList);
logger.debug("Created index searcher {} for {}", searcher, indexDir);
return searcher;
}
@Override
public void returnIndexSearcher(final File indexDirectory, final IndexSearcher searcher) {
logger.debug("Closing index searcher {} for {}", searcher, indexDirectory);
final List<Closeable> closeableList = closeables.get(searcher);
if (closeableList != null) {
for (final Closeable closeable : closeableList) {
closeQuietly(closeable);
}
}
logger.debug("Closed index searcher {}", searcher);
}
@Override
public void removeIndex(final File indexDirectory) {
}
@Override
public synchronized IndexWriter borrowIndexWriter(final File indexingDirectory) throws IOException {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
logger.trace("Borrowing index writer for {}", indexingDirectory);
IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if (writerCount == null) {
final List<Closeable> closeables = new ArrayList<>();
final Directory directory = FSDirectory.open(indexingDirectory);
closeables.add(directory);
try {
final Analyzer analyzer = new StandardAnalyzer();
closeables.add(analyzer);
final IndexWriterConfig config = new IndexWriterConfig(LuceneUtil.LUCENE_VERSION, analyzer);
config.setWriteLockTimeout(300000L);
final IndexWriter indexWriter = new IndexWriter(directory, config);
writerCount = new IndexWriterCount(indexWriter, analyzer, directory, 1);
logger.debug("Providing new index writer for {}", indexingDirectory);
} catch (final IOException ioe) {
for (final Closeable closeable : closeables) {
try {
closeable.close();
} catch (final IOException ioe2) {
ioe.addSuppressed(ioe2);
}
}
throw ioe;
}
writerCounts.put(absoluteFile, writerCount);
} else {
logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
writerCount.getAnalyzer(), writerCount.getDirectory(), writerCount.getCount() + 1));
}
return writerCount.getWriter();
}
@Override
public synchronized void returnIndexWriter(final File indexingDirectory, final IndexWriter writer) {
final File absoluteFile = indexingDirectory.getAbsoluteFile();
logger.trace("Returning Index Writer for {} to IndexManager", indexingDirectory);
final IndexWriterCount count = writerCounts.remove(absoluteFile);
try {
if (count == null) {
logger.warn("Index Writer {} was returned to IndexManager for {}, but this writer is not known. "
+ "This could potentially lead to a resource leak", writer, indexingDirectory);
writer.close();
} else if (count.getCount() <= 1) {
// we are finished with this writer.
logger.debug("Decrementing count for Index Writer for {} to {}; Closing writer", indexingDirectory, count.getCount() - 1);
writer.commit();
count.close();
} else {
// decrement the count.
logger.debug("Decrementing count for Index Writer for {} to {}", indexingDirectory, count.getCount() - 1);
writerCounts.put(absoluteFile, new IndexWriterCount(count.getWriter(), count.getAnalyzer(), count.getDirectory(), count.getCount() - 1));
}
} catch (final IOException ioe) {
logger.warn("Failed to close Index Writer {} due to {}", writer, ioe);
if (logger.isDebugEnabled()) {
logger.warn("", ioe);
}
}
}
private static void closeQuietly(final Closeable... closeables) {
for (final Closeable closeable : closeables) {
if (closeable == null) {
continue;
}
try {
closeable.close();
} catch (final Exception e) {
logger.warn("Failed to close {} due to {}", closeable, e);
}
}
}
private static class IndexWriterCount implements Closeable {
private final IndexWriter writer;
private final Analyzer analyzer;
private final Directory directory;
private final int count;
public IndexWriterCount(final IndexWriter writer, final Analyzer analyzer, final Directory directory, final int count) {
this.writer = writer;
this.analyzer = analyzer;
this.directory = directory;
this.count = count;
}
public Analyzer getAnalyzer() {
return analyzer;
}
public Directory getDirectory() {
return directory;
}
public IndexWriter getWriter() {
return writer;
}
public int getCount() {
return count;
}
@Override
public void close() throws IOException {
closeQuietly(writer, analyzer, directory);
}
}
}

View File

@ -64,6 +64,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;
import org.apache.nifi.provenance.lucene.CachingIndexManager;
import org.apache.nifi.provenance.lucene.IndexManager;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.search.Query;
@ -496,16 +497,16 @@ public class TestPersistentProvenanceRepository {
final CountDownLatch obtainIndexSearcherLatch = new CountDownLatch(2);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
private IndexManager wrappedManager = null;
private CachingIndexManager wrappedManager = null;
// Create an IndexManager that adds a delay before returning the Index Searcher.
@Override
protected synchronized IndexManager getIndexManager() {
protected synchronized CachingIndexManager getIndexManager() {
if (wrappedManager == null) {
final IndexManager mgr = super.getIndexManager();
final Logger logger = LoggerFactory.getLogger("IndexManager");
wrappedManager = new IndexManager() {
wrappedManager = new CachingIndexManager() {
final AtomicInteger indexSearcherCount = new AtomicInteger(0);
@Override

View File

@ -42,15 +42,15 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestIndexManager {
public class TestCachingIndexManager {
private File indexDir;
private IndexManager manager;
private CachingIndexManager manager;
@Before
public void setup() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
manager = new IndexManager();
manager = new CachingIndexManager();
indexDir = new File("target/testIndexManager/" + UUID.randomUUID().toString());
indexDir.mkdirs();

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.lucene;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.nifi.util.file.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestSimpleIndexManager {
@BeforeClass
public static void setLogLevel() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
}
@Test
public void testMultipleWritersSimultaneouslySameIndex() throws IOException {
final SimpleIndexManager mgr = new SimpleIndexManager();
final File dir = new File("target/" + UUID.randomUUID().toString());
try {
final IndexWriter writer1 = mgr.borrowIndexWriter(dir);
final IndexWriter writer2 = mgr.borrowIndexWriter(dir);
final Document doc1 = new Document();
doc1.add(new StringField("id", "1", Store.YES));
final Document doc2 = new Document();
doc2.add(new StringField("id", "2", Store.YES));
writer1.addDocument(doc1);
writer2.addDocument(doc2);
mgr.returnIndexWriter(dir, writer2);
mgr.returnIndexWriter(dir, writer1);
final IndexSearcher searcher = mgr.borrowIndexSearcher(dir);
final TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), 2);
assertEquals(2, topDocs.totalHits);
mgr.returnIndexSearcher(dir, searcher);
} finally {
FileUtils.deleteFile(dir, true);
}
}
}