NIFI-3523: Ensure that if we are unable to read Provenance events on startup that we don't prevent NiFi from starting

This closes #1542
This commit is contained in:
Mark Payne 2017-02-24 11:45:26 -05:00 committed by Oleg Zhurakousky
parent 4ed64e7561
commit 6c44d6be2e
5 changed files with 26 additions and 21 deletions

View File

@ -141,7 +141,12 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository {
eventStore.initialize(); eventStore.initialize();
eventIndex.initialize(eventStore); eventIndex.initialize(eventStore);
eventStore.reindexLatestEvents(eventIndex); try {
eventStore.reindexLatestEvents(eventIndex);
} catch (final Exception e) {
logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest "
+ "events will not be available from the Provenance Repository when a query is issued.", e);
}
} }
@Override @Override

View File

@ -318,7 +318,9 @@ public class SimpleIndexManager implements IndexManager {
// This method exists solely for unit testing purposes. // This method exists solely for unit testing purposes.
protected void close(final IndexWriterCount count) throws IOException { protected void close(final IndexWriterCount count) throws IOException {
logger.debug("Closing Index Writer for {}...", count.getWriter().getDirectory());
count.close(); count.close();
logger.debug("Finished closing Index Writer for {}...", count.getWriter().getDirectory());
} }
protected int getWriterCount() { protected int getWriterCount() {

View File

@ -283,7 +283,20 @@ public abstract class CompressableRecordReader implements RecordReader {
} }
if (isData()) { if (isData()) {
return nextRecord(dis, serializationVersion); while (true) {
try {
return nextRecord(dis, serializationVersion);
} catch (final IOException ioe) {
throw ioe;
} catch (final Exception e) {
// This would only happen if a bug were to exist such that an 'invalid' event were written
// out. For example an Event that has no FlowFile UUID. While there is in fact an underlying
// cause that would need to be sorted out in this case, the Provenance Repository should be
// resilient enough to handle this. Otherwise, we end up throwing an Exception, which may
// prevent iterating over additional events in the repository.
logger.error("Failed to read Provenance Event from " + filename + "; will skip this event and continue reading subsequent events", e);
}
}
} else { } else {
return null; return null;
} }

View File

@ -128,8 +128,8 @@ public class WriteAheadStorePartition implements EventStorePartition {
maxEventId = eventId; maxEventId = eventId;
break; break;
} }
} catch (final IOException ioe) { } catch (final Exception e) {
logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, ioe); logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, e);
} }
} }

View File

@ -1905,13 +1905,6 @@ public class TestPersistentProvenanceRepository {
testRepo.recoverJournalFiles(); testRepo.recoverJournalFiles();
assertEquals("mergeJournals() should report a skipped journal", 1, reportedEvents.size());
assertEquals("mergeJournals() should report a skipped journal",
"Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException: "
+ "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible "
+ "that the record wasn't completely written to the file. This journal will be skipped.",
reportedEvents.get(reportedEvents.size() - 1).getMessage());
final File storageDir = config.getStorageDirectories().values().iterator().next(); final File storageDir = config.getStorageDirectories().values().iterator().next();
assertTrue(checkJournalRecords(storageDir, false) < 10000); assertTrue(checkJournalRecords(storageDir, false) < 10000);
} }
@ -1937,7 +1930,7 @@ public class TestPersistentProvenanceRepository {
final ProvenanceEventRecord record = builder.build(); final ProvenanceEventRecord record = builder.build();
final ExecutorService exec = Executors.newFixedThreadPool(10); final ExecutorService exec = Executors.newFixedThreadPool(10);
final List<Future> futures = new ArrayList<>(); final List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
futures.add(exec.submit(new Runnable() { futures.add(exec.submit(new Runnable() {
@Override @Override
@ -1948,7 +1941,7 @@ public class TestPersistentProvenanceRepository {
} }
// corrupt the first record of the first journal file // corrupt the first record of the first journal file
for (Future future : futures) { for (Future<?> future : futures) {
while (!future.isDone()) { while (!future.isDone()) {
Thread.sleep(10); Thread.sleep(10);
} }
@ -1958,14 +1951,6 @@ public class TestPersistentProvenanceRepository {
testRepo.recoverJournalFiles(); testRepo.recoverJournalFiles();
assertEquals("mergeJournals should report a skipped journal", 1, reportedEvents.size());
assertEquals("mergeJournals should report a skipped journal",
"Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException: "
+ "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible "
+ "that the record wasn't completely written to the file. The remainder of this journal will "
+ "be skipped.",
reportedEvents.get(reportedEvents.size() - 1).getMessage());
final File storageDir = config.getStorageDirectories().values().iterator().next(); final File storageDir = config.getStorageDirectories().values().iterator().next();
assertTrue(checkJournalRecords(storageDir, false) < 10000); assertTrue(checkJournalRecords(storageDir, false) < 10000);
} }