NIFI-2600: Ensure that we do not close Index Searchers prematurely, even if they are poisoned

This closes #896
This commit is contained in:
Mark Payne 2016-08-18 15:29:34 -04:00 committed by Matt Burgess
parent a2d3d0c289
commit 95b5877f5d
4 changed files with 195 additions and 96 deletions

View File

@ -1241,6 +1241,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
}
protected long getRolloverRetryMillis() {
return 10000L;
}
/**
* <p>
* MUST be called with the write lock held.
@ -1349,14 +1353,14 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
future.cancel(false);
} else {
logger.warn("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
logger.warn("Couldn't merge journals. Will try again. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
}
}
};
// We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we
// fail for some reason. When we succeed or if retries are exceeded, the Runnable will cancel itself.
future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, getRolloverRetryMillis(), TimeUnit.MILLISECONDS);
futureReference.set(future);
}
@ -1499,6 +1503,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
return mergedFile;
}
protected List<File> filterUnavailableFiles(final List<File> journalFiles) {
return journalFiles.stream().filter(file -> file.exists()).collect(Collectors.toList());
}
/**
* <p>
* Merges all of the given Journal Files into a single, merged Provenance
@ -1555,9 +1563,10 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
});
//Search for any missing files. At this point they should have been written to disk otherwise cannot continue
//missing files is most likely due to incomplete cleanup of files post merge
final long numAvailableFiles = journalFiles.size() - journalFiles.stream().filter(file -> !file.exists()).count();
// Search for any missing files. At this point they should have been written to disk otherwise cannot continue.
// Missing files is most likely due to incomplete cleanup of files post merge
final List<File> availableFiles = filterUnavailableFiles(journalFiles);
final int numAvailableFiles = availableFiles.size();
// check if we have all of the "partial" files for the journal.
if (numAvailableFiles > 0) {
@ -1606,7 +1615,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
final File writerFile = isCompress ? new File(suggestedMergeFile.getParentFile(), suggestedMergeFile.getName() + ".gz") : suggestedMergeFile;
try {
for (final File journalFile : journalFiles) {
for (final File journalFile : availableFiles) {
try {
// Use MAX_VALUE for number of chars because we don't want to truncate the value as we write it
// out. This allows us to later decide that we want more characters and still be able to retrieve
@ -1842,7 +1851,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
// Success. Remove all of the journal files, as they're no longer needed, now that they've been merged.
for (final File journalFile : journalFiles) {
for (final File journalFile : availableFiles) {
if (!journalFile.delete() && journalFile.exists()) {
logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());

View File

@ -187,47 +187,26 @@ public class IndexManager implements Closeable {
} else {
// keep track of any searchers that have been closed so that we can remove them
// from our cache later.
final List<ActiveIndexSearcher> expired = new ArrayList<>();
try {
for ( final ActiveIndexSearcher searcher : currentlyCached ) {
if ( searcher.isCache() ) {
// if the searcher is poisoned, we want to close and expire it.
if ( searcher.isPoisoned() ) {
logger.debug("Index Searcher for {} is poisoned; removing cached searcher", absoluteFile);
expired.add(searcher);
continue;
}
// if there are no references to the reader, it will have been closed. Since there is no
// isClosed() method, this is how we determine whether it's been closed or not.
final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
if ( refCount <= 0 ) {
// if refCount == 0, then the reader has been closed, so we need to discard the searcher
logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ "removing cached searcher", absoluteFile, refCount);
expired.add(searcher);
continue;
}
final int referenceCount = searcher.incrementReferenceCount();
logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
return searcher.getSearcher();
}
}
} finally {
// if we have any expired index searchers, we need to close them and remove them
// from the cache so that we don't try to use them again later.
for ( final ActiveIndexSearcher searcher : expired ) {
try {
logger.debug("Closing {}", searcher);
searcher.close();
logger.trace("Closed {}", searcher);
} catch (final Exception e) {
logger.debug("Failed to close 'expired' IndexSearcher {}", searcher);
for (final ActiveIndexSearcher searcher : currentlyCached) {
if (searcher.isCache()) {
// if the searcher is poisoned, we want to close and expire it.
if (searcher.isPoisoned()) {
continue;
}
currentlyCached.remove(searcher);
// if there are no references to the reader, it will have been closed. Since there is no
// isClosed() method, this is how we determine whether it's been closed or not.
final int refCount = searcher.getSearcher().getIndexReader().getRefCount();
if (refCount <= 0) {
// if refCount == 0, then the reader has been closed, so we cannot use the searcher
logger.debug("Reference count for cached Index Searcher for {} is currently {}; "
+ "removing cached searcher", absoluteFile, refCount);
continue;
}
final int referenceCount = searcher.incrementReferenceCount();
logger.debug("Providing previously cached index searcher for {} and incrementing Reference Count to {}", indexDir, referenceCount);
return searcher.getSearcher();
}
}
}
@ -312,16 +291,14 @@ public class IndexManager implements Closeable {
if ( activeSearcher.getSearcher().equals(searcher) ) {
activeSearcherFound = true;
if ( activeSearcher.isCache() ) {
// if the searcher is poisoned, close it and remove from "pool".
// if the searcher is poisoned, close it and remove from "pool". Otherwise,
// just decrement the count. Note here that when we call close() it won't actually close
// the underlying directory reader unless there are no more references to it
if ( activeSearcher.isPoisoned() ) {
itr.remove();
try {
logger.debug("Closing Index Searcher for {} because it is poisoned", indexDirectory);
final boolean allReferencesClosed = activeSearcher.close();
if (!allReferencesClosed) {
currentlyCached.add(activeSearcher);
}
activeSearcher.close();
} catch (final IOException ioe) {
logger.warn("Failed to close Index Searcher for {} due to {}", absoluteFile, ioe);
if ( logger.isDebugEnabled() ) {
@ -384,7 +361,8 @@ public class IndexManager implements Closeable {
}
if (!activeSearcherFound) {
logger.error("Index Searcher {} was returned for {} but found no Active Searcher for it", searcher, indexDirectory);
logger.debug("Index Searcher {} was returned for {} but found no Active Searcher for it. "
+ "This will occur if the Index Searcher was already returned while being poisoned.", searcher, indexDirectory);
}
} finally {
lock.unlock();

View File

@ -16,6 +16,34 @@
*/
package org.apache.nifi.provenance;
import static org.apache.nifi.provenance.TestUtil.createFlowFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
import org.apache.lucene.document.Document;
@ -63,35 +91,6 @@ import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import static org.apache.nifi.provenance.TestUtil.createFlowFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class TestPersistentProvenanceRepository {
@Rule
@ -1681,6 +1680,17 @@ public class TestPersistentProvenanceRepository {
retryAmount.incrementAndGet();
return super.mergeJournals(journalFiles, suggestedMergeFile, eventReporter);
}
// Indicate that there are no files available.
@Override
protected List<File> filterUnavailableFiles(List<File> journalFiles) {
return Collections.emptyList();
}
@Override
protected long getRolloverRetryMillis() {
return 10L; // retry quickly.
}
};
repo.initialize(getEventReporter(), null, null);
@ -1706,23 +1716,11 @@ public class TestPersistentProvenanceRepository {
}
});
}
final File storageDir = config.getStorageDirectories().get(0);
//trigger retry through full file deletion
Arrays.asList(storageDir.listFiles())
.stream()
.map(file -> new File(storageDir, "journals"))
.map(journalDir -> Arrays.asList(journalDir.listFiles()))
.flatMap(partials -> partials.stream())
.filter(partial -> partial.exists())
.forEach(file -> {
file.delete();
});
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
repo.waitForRollover();
assertEquals(5,retryAmount.get());
}
@Test

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.lucene;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.UUID;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.BooleanClause.Occur;
import org.apache.nifi.util.file.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestIndexManager {
private File indexDir;
private IndexManager manager;
@Before
public void setup() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
manager = new IndexManager();
indexDir = new File("target/testIndexManager/" + UUID.randomUUID().toString());
indexDir.mkdirs();
}
@After
public void cleanup() throws IOException {
manager.close();
FileUtils.deleteFiles(Collections.singleton(indexDir), true);
}
@Test
public void test() throws IOException {
// Create and IndexWriter and add a document to the index, then close the writer.
// This gives us something that we can query.
final IndexWriter writer = manager.borrowIndexWriter(indexDir);
final Document doc = new Document();
doc.add(new StringField("unit test", "true", Store.YES));
writer.addDocument(doc);
manager.returnIndexWriter(indexDir, writer);
// Get an Index Searcher that we can use to query the index.
final IndexSearcher cachedSearcher = manager.borrowIndexSearcher(indexDir);
// Ensure that we get the expected results.
assertCount(cachedSearcher, 1);
// While we already have an Index Searcher, get a writer for the same index.
// This will cause the Index Searcher to be marked as poisoned.
final IndexWriter writer2 = manager.borrowIndexWriter(indexDir);
// Obtain a new Index Searcher with the writer open. This Index Searcher should *NOT*
// be the same as the previous searcher because the new one will be a Near-Real-Time Index Searcher
// while the other is not.
final IndexSearcher nrtSearcher = manager.borrowIndexSearcher(indexDir);
assertNotSame(cachedSearcher, nrtSearcher);
// Ensure that we get the expected query results.
assertCount(nrtSearcher, 1);
// Return the writer, so that there is no longer an active writer for the index.
manager.returnIndexWriter(indexDir, writer2);
// Ensure that we still get the same result.
assertCount(cachedSearcher, 1);
manager.returnIndexSearcher(indexDir, cachedSearcher);
// Ensure that our near-real-time index searcher still gets the same result.
assertCount(nrtSearcher, 1);
manager.returnIndexSearcher(indexDir, nrtSearcher);
}
private void assertCount(final IndexSearcher searcher, final int count) throws IOException {
final BooleanQuery query = new BooleanQuery();
query.add(new BooleanClause(new TermQuery(new Term("unit test", "true")), Occur.MUST));
final TopDocs topDocs = searcher.search(query, count * 10);
assertNotNull(topDocs);
assertEquals(1, topDocs.totalHits);
}
}