From 6c44d6be2e31eae491f236f3be39eb3309c45cda Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 24 Feb 2017 11:45:26 -0500 Subject: [PATCH] NIFI-3523: Ensure that if we are unable to read Provenance events on startup that we don't prevent NiFi from starting This closes #1542 --- .../WriteAheadProvenanceRepository.java | 7 ++++++- .../provenance/lucene/SimpleIndexManager.java | 2 ++ .../CompressableRecordReader.java | 15 ++++++++++++++- .../store/WriteAheadStorePartition.java | 4 ++-- .../TestPersistentProvenanceRepository.java | 19 ++----------------- 5 files changed, 26 insertions(+), 21 deletions(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java index 229a96d5d9..89750282de 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java @@ -141,7 +141,12 @@ public class WriteAheadProvenanceRepository implements ProvenanceRepository { eventStore.initialize(); eventIndex.initialize(eventStore); - eventStore.reindexLatestEvents(eventIndex); + try { + eventStore.reindexLatestEvents(eventIndex); + } catch (final Exception e) { + logger.error("Failed to re-index some of the Provenance Events. It is possible that some of the latest " + + "events will not be available from the Provenance Repository when a query is issued.", e); + } } @Override diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java index b0b01e592b..4d6c11d946 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/SimpleIndexManager.java @@ -318,7 +318,9 @@ public class SimpleIndexManager implements IndexManager { // This method exists solely for unit testing purposes. protected void close(final IndexWriterCount count) throws IOException { + logger.debug("Closing Index Writer for {}...", count.getWriter().getDirectory()); count.close(); + logger.debug("Finished closing Index Writer for {}...", count.getWriter().getDirectory()); } protected int getWriterCount() { diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java index 1a6c3c55dc..93c066963e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java @@ -283,7 +283,20 @@ public abstract class CompressableRecordReader implements RecordReader { } if (isData()) { - return nextRecord(dis, serializationVersion); + while (true) { + try { + return nextRecord(dis, serializationVersion); + } catch (final IOException ioe) { + throw ioe; + } catch (final Exception e) { + // This would only happen if a bug were to exist such that an 'invalid' event were written + // out. For example an Event that has no FlowFile UUID. While there is in fact an underlying + // cause that would need to be sorted out in this case, the Provenance Repository should be + // resilient enough to handle this. Otherwise, we end up throwing an Exception, which may + // prevent iterating over additional events in the repository. + logger.error("Failed to read Provenance Event from " + filename + "; will skip this event and continue reading subsequent events", e); + } + } } else { return null; } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java index a25043a50f..fde76f5063 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/store/WriteAheadStorePartition.java @@ -128,8 +128,8 @@ public class WriteAheadStorePartition implements EventStorePartition { maxEventId = eventId; break; } - } catch (final IOException ioe) { - logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, ioe); + } catch (final Exception e) { + logger.warn("Could not read file {}; if this file contains Provenance Events, new events may be created with the same event identifiers", file, e); } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 48d8e09aff..e3f729b996 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -1905,13 +1905,6 @@ public class TestPersistentProvenanceRepository { testRepo.recoverJournalFiles(); - assertEquals("mergeJournals() should report a skipped journal", 1, reportedEvents.size()); - assertEquals("mergeJournals() should report a skipped journal", - "Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException: " - + "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible " - + "that the record wasn't completely written to the file. This journal will be skipped.", - reportedEvents.get(reportedEvents.size() - 1).getMessage()); - final File storageDir = config.getStorageDirectories().values().iterator().next(); assertTrue(checkJournalRecords(storageDir, false) < 10000); } @@ -1937,7 +1930,7 @@ public class TestPersistentProvenanceRepository { final ProvenanceEventRecord record = builder.build(); final ExecutorService exec = Executors.newFixedThreadPool(10); - final List futures = new ArrayList<>(); + final List> futures = new ArrayList<>(); for (int i = 0; i < 10000; i++) { futures.add(exec.submit(new Runnable() { @Override @@ -1948,7 +1941,7 @@ public class TestPersistentProvenanceRepository { } // corrupt the first record of the first journal file - for (Future future : futures) { + for (Future future : futures) { while (!future.isDone()) { Thread.sleep(10); } @@ -1958,14 +1951,6 @@ public class TestPersistentProvenanceRepository { testRepo.recoverJournalFiles(); - assertEquals("mergeJournals should report a skipped journal", 1, reportedEvents.size()); - assertEquals("mergeJournals should report a skipped journal", - "Failed to read Provenance Event Record from Journal due to java.lang.IllegalArgumentException: " - + "No enum constant org.apache.nifi.provenance.ProvenanceEventType.BADTYPE; it's possible " - + "that the record wasn't completely written to the file. The remainder of this journal will " - + "be skipped.", - reportedEvents.get(reportedEvents.size() - 1).getMessage()); - final File storageDir = config.getStorageDirectories().values().iterator().next(); assertTrue(checkJournalRecords(storageDir, false) < 10000); }