NIFI-596: If IndexWriter is opened for same directory as an IndexReader, mark IndexReader as poisoned and stop using it

NIFI-595: Delete .toc files when expiring an event file

NIFI-597: Only increment counter for number of documents retrieved after reading the record
This commit is contained in:
Mark Payne 2015-05-06 15:43:20 -04:00
parent c12778f17f
commit b760505bf3
3 changed files with 92 additions and 22 deletions

View File

@ -20,7 +20,7 @@ import java.io.File;
import java.io.IOException;
import org.apache.nifi.provenance.lucene.DeleteIndexAction;
import org.apache.nifi.provenance.toc.TocUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -30,16 +30,32 @@ public class FileRemovalAction implements ExpirationAction {
@Override
public File execute(final File expiredFile) throws IOException {
final boolean removed = remove(expiredFile);
if (removed) {
logger.info("Removed expired Provenance Event file {}", expiredFile);
} else {
logger.warn("Failed to remove old Provenance Event file {}; this file should be cleaned up manually", expiredFile);
}
final File tocFile = TocUtil.getTocFile(expiredFile);
if (remove(tocFile)) {
logger.info("Removed expired Provenance Table-of-Contents file {}", tocFile);
} else {
logger.warn("Failed to remove old Provenance Table-of-Contents file {}; this file should be cleaned up manually", expiredFile);
}
return removed ? null : expiredFile;
}
private boolean remove(final File file) {
boolean removed = false;
for (int i = 0; i < 10 && !removed; i++) {
if ((removed = expiredFile.delete())) {
logger.info("Removed expired Provenance Event file {}", expiredFile);
return null;
if (removed = file.delete()) {
return true;
}
}
logger.warn("Failed to remove old Provenance Event file {}", expiredFile);
return expiredFile;
return false;
}
@Override

View File

@ -64,14 +64,11 @@ public class DocsReader {
final int docId = scoreDoc.doc;
final Document d = indexReader.document(docId);
docs.add(d);
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
}
}
final long readDocuments = System.nanoTime() - start;
logger.debug("Reading {} Lucene Documents took {} millis", docs.size(), TimeUnit.NANOSECONDS.toMillis(readDocuments));
return read(docs, allProvenanceLogFiles);
return read(docs, allProvenanceLogFiles, retrievalCount, maxResults);
}
@ -88,7 +85,7 @@ public class DocsReader {
private ProvenanceEventRecord getRecord(final Document d, final RecordReader reader) throws IOException {
IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
final IndexableField blockField = d.getField(FieldNames.BLOCK_INDEX);
if ( blockField == null ) {
reader.skipTo(getByteOffset(d, reader));
} else {
@ -97,7 +94,7 @@ public class DocsReader {
StandardProvenanceEventRecord record;
while ( (record = reader.nextRecord()) != null) {
IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
final IndexableField idField = d.getField(SearchableFields.Identifier.getSearchableFieldName());
if ( idField == null || idField.numericValue().longValue() == record.getEventId() ) {
break;
}
@ -111,7 +108,11 @@ public class DocsReader {
}
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles) throws IOException {
public Set<ProvenanceEventRecord> read(final List<Document> docs, final Collection<Path> allProvenanceLogFiles, final AtomicInteger retrievalCount, final int maxResults) throws IOException {
if (retrievalCount.get() >= maxResults) {
return Collections.emptySet();
}
LuceneUtil.sortDocsForRetrieval(docs);
RecordReader reader = null;
@ -133,6 +134,10 @@ public class DocsReader {
try {
if (reader != null && storageFilename.equals(lastStorageFilename)) {
matchingRecords.add(getRecord(d, reader));
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
}
} else {
logger.debug("Opening log file {}", storageFilename);
@ -141,7 +146,7 @@ public class DocsReader {
reader.close();
}
List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
final List<File> potentialFiles = LuceneUtil.getProvenanceLogFiles(storageFilename, allProvenanceLogFiles);
if (potentialFiles.isEmpty()) {
logger.warn("Could not find Provenance Log File with basename {} in the "
+ "Provenance Repository; assuming file has expired and continuing without it", storageFilename);
@ -158,6 +163,10 @@ public class DocsReader {
try {
reader = RecordReaders.newRecordReader(file, allProvenanceLogFiles);
matchingRecords.add(getRecord(d, reader));
if ( retrievalCount.incrementAndGet() >= maxResults ) {
break;
}
} catch (final IOException e) {
throw new IOException("Failed to retrieve record " + d + " from Provenance File " + file + " due to " + e, e);
}

View File

@ -119,6 +119,14 @@ public class IndexManager implements Closeable {
}
writerCounts.put(absoluteFile, writerCount);
// Mark any active searchers as poisoned because we are updating the index
final List<ActiveIndexSearcher> searchers = activeSearchers.get(absoluteFile);
if ( searchers != null ) {
for (final ActiveIndexSearcher activeSearcher : searchers) {
activeSearcher.poison();
}
}
} else {
logger.debug("Providing existing index writer for {} and incrementing count to {}", indexingDirectory, writerCount.getCount() + 1);
writerCounts.put(absoluteFile, new IndexWriterCount(writerCount.getWriter(),
@ -137,7 +145,7 @@ public class IndexManager implements Closeable {
lock.lock();
try {
IndexWriterCount count = writerCounts.remove(absoluteFile);
final IndexWriterCount count = writerCounts.remove(absoluteFile);
try {
if ( count == null ) {
@ -184,6 +192,15 @@ public class IndexManager implements Closeable {
try {
for ( final ActiveIndexSearcher searcher : currentlyCached ) {
if ( searcher.isCache() ) {
// if the searcher is poisoned, we want to close and expire it.
if ( searcher.isPoisoned() ) {
logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile);
expired.add(searcher);
continue;
}
// if there are no references to the reader, it will have been closed. Since there is no
// isClosed() method, this is how we determine whether it's been closed or not.
final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
if ( refCount <= 0 ) {
// if refCount == 0, then the reader has been closed, so we need to discard the searcher
@ -212,7 +229,7 @@ public class IndexManager implements Closeable {
}
}
IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
final IndexWriterCount writerCount = writerCounts.remove(absoluteFile);
if ( writerCount == null ) {
final Directory directory = FSDirectory.open(absoluteFile);
logger.debug("No Index Writer currently exists for {}; creating a cachable reader", indexDir);
@ -270,21 +287,40 @@ public class IndexManager implements Closeable {
lock.lock();
try {
// check if we already have a reader cached.
List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
final List<ActiveIndexSearcher> currentlyCached = activeSearchers.get(absoluteFile);
if ( currentlyCached == null ) {
logger.warn("Received Index Searcher for {} but no searcher was provided for that directory; this could "
+ "result in a resource leak", indexDirectory);
return;
}
// Check if the given searcher is in our list. We use an Iterator to do this so that if we
// find it we can call remove() on the iterator if need be.
final Iterator<ActiveIndexSearcher> itr = currentlyCached.iterator();
while (itr.hasNext()) {
final ActiveIndexSearcher activeSearcher = itr.next();
if ( activeSearcher.getSearcher().equals(searcher) ) {
if ( activeSearcher.isCache() ) {
// if the searcher is poisoned, close it and remove from "pool".
if ( activeSearcher.isPoisoned() ) {
itr.remove();
try {
logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory);
activeSearcher.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
return;
} else {
// the searcher is cached. Just leave it open.
logger.debug("Index searcher for {} is cached; leaving open", indexDirectory);
return;
}
} else {
// searcher is not cached. It was created from a writer, and we want
// the newest updates the next time that we get a searcher, so we will
@ -405,9 +441,10 @@ public class IndexManager implements Closeable {
private final DirectoryReader directoryReader;
private final Directory directory;
private final boolean cache;
private boolean poisoned = false;
public ActiveIndexSearcher(IndexSearcher searcher, DirectoryReader directoryReader,
Directory directory, final boolean cache) {
public ActiveIndexSearcher(final IndexSearcher searcher, final DirectoryReader directoryReader,
final Directory directory, final boolean cache) {
this.searcher = searcher;
this.directoryReader = directoryReader;
this.directory = directory;
@ -422,6 +459,14 @@ public class IndexManager implements Closeable {
return searcher;
}
public boolean isPoisoned() {
return poisoned;
}
public void poison() {
this.poisoned = true;
}
@Override
public void close() throws IOException {
IndexManager.close(directoryReader, directory);