NIFI-2395 This closes #734. Ensure that if we fail to index provenance events we do not prevent the repo from continuing to merge journals

This commit is contained in:
Mark Payne 2016-07-28 10:19:45 -04:00 committed by joewitt
parent cddbe7d41f
commit cfc8a9613c
2 changed files with 105 additions and 11 deletions

View File

@ -123,6 +123,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
public static final Pattern INDEX_PATTERN = Pattern.compile("index-\\d+");
public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
public static final int MAX_UNDELETED_QUERY_RESULTS = 10;
public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file
private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class);
@ -1648,7 +1649,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
try (final RecordWriter writer = RecordWriters.newRecordWriter(writerFile, configuration.isCompressOnRollover(), true)) {
writer.writeHeader(minEventId);
final IndexingAction indexingAction = new IndexingAction(this);
final IndexingAction indexingAction = createIndexingAction();
final File indexingDirectory = indexConfig.getWritableIndexDirectory(writerFile, earliestTimestamp);
long maxId = 0L;
@ -1668,16 +1669,19 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
});
final AtomicInteger indexingFailureCount = new AtomicInteger(0);
try {
for (int i = 0; i < configuration.getIndexThreadPoolSize(); i++) {
final Callable<Object> callable = new Callable<Object>() {
@Override
public Object call() throws IOException {
while (!eventQueue.isEmpty() || !finishedAdding.get()) {
try {
final Tuple<StandardProvenanceEventRecord, Integer> tuple;
try {
tuple = eventQueue.poll(10, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
continue;
}
@ -1686,6 +1690,12 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
}
indexingAction.index(tuple.getKey(), indexWriter, tuple.getValue());
} catch (final Throwable t) {
logger.error("Failed to index Provenance Event for " + writerFile + " to " + indexingDirectory, t);
if (indexingFailureCount.incrementAndGet() >= MAX_INDEXING_FAILURE_COUNT) {
return null;
}
}
}
return null;
@ -1696,6 +1706,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
futures.add(future);
}
boolean indexEvents = true;
while (!recordToReaderMap.isEmpty()) {
final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
final StandardProvenanceEventRecord record = entry.getKey();
@ -1705,12 +1716,30 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
final int blockIndex = writer.getTocWriter().getCurrentBlockIndex();
boolean accepted = false;
while (!accepted) {
while (!accepted && indexEvents) {
try {
accepted = eventQueue.offer(new Tuple<>(record, blockIndex), 10, TimeUnit.MILLISECONDS);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
// If we weren't able to add anything to the queue, check if we have reached our max failure count.
// We do this here because if we do reach our max failure count, all of the indexing threads will stop
// performing their jobs. As a result, the queue will fill and we won't be able to add anything to it.
// So, if the queue is filled, we will check if this is the case.
if (!accepted && indexingFailureCount.get() >= MAX_INDEXING_FAILURE_COUNT) {
indexEvents = false; // don't add anything else to the queue.
eventQueue.clear();
final String warning = String.format("Indexing Provenance Events for %s has failed %s times. This exceeds the maximum threshold of %s failures, "
+ "so no more Provenance Events will be indexed for this Provenance file.", writerFile, indexingFailureCount.get(), MAX_INDEXING_FAILURE_COUNT);
logger.warn(warning);
if (eventReporter != null) {
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning);
}
}
}
maxId = record.getEventId();
latestRecords.add(truncateAttributes(record));
@ -1747,6 +1776,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
throw new RuntimeException(t);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted");
}
}
@ -1810,6 +1840,15 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
return writerFile;
}
/**
* This method is protected and exists for testing purposes. This allows unit tests to extend this class and
* override the createIndexingAction so that they can mock out the Indexing Action to throw Exceptions, count
* events indexed, etc.
*/
protected IndexingAction createIndexingAction() {
return new IndexingAction(this);
}
private StandardProvenanceEventRecord truncateAttributes(final StandardProvenanceEventRecord original) {
boolean requireTruncation = false;
@ -2264,6 +2303,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
throw new AccessDeniedException("Cannot retrieve Provenance Query Submission because " + user.getIdentity() + " is not the user who submitted the request");
}
@Override
public ProvenanceEventRecord getEvent(final long id) throws IOException {
final List<ProvenanceEventRecord> records = getEvents(id, 1);
if (records.isEmpty()) {

View File

@ -20,6 +20,7 @@ import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.core.SimpleAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.IndexSearcher;
@ -35,6 +36,7 @@ import org.apache.nifi.provenance.lineage.Lineage;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.provenance.lineage.LineageNodeType;
import org.apache.nifi.provenance.lucene.IndexingAction;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QueryResult;
import org.apache.nifi.provenance.search.QuerySubmission;
@ -1536,6 +1538,58 @@ public class TestPersistentProvenanceRepository {
}
@Test(timeout=5000)
public void testExceptionOnIndex() throws IOException {
final RepositoryConfiguration config = createConfiguration();
config.setMaxAttributeChars(50);
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
config.setIndexThreadPoolSize(1);
final int numEventsToIndex = 10;
final AtomicInteger indexedEventCount = new AtomicInteger(0);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
@Override
protected synchronized IndexingAction createIndexingAction() {
return new IndexingAction(repo) {
@Override
public void index(StandardProvenanceEventRecord record, IndexWriter indexWriter, Integer blockIndex) throws IOException {
final int count = indexedEventCount.incrementAndGet();
if (count <= numEventsToIndex) {
return;
}
throw new IOException("Unit Test - Intentional Exception");
}
};
}
};
repo.initialize(getEventReporter(), null, null);
final Map<String, String> attributes = new HashMap<>();
attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
for (int i=0; i < 1000; i++) {
final ProvenanceEventRecord record = builder.build();
repo.registerEvent(record);
}
repo.waitForRollover();
assertEquals(numEventsToIndex + PersistentProvenanceRepository.MAX_INDEXING_FAILURE_COUNT, indexedEventCount.get());
assertEquals(1, reportedEvents.size());
final ReportedEvent event = reportedEvents.get(0);
assertEquals(Severity.WARNING, event.getSeverity());
}
@Test
public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();