From fbd3201157883e2606746af448f5f3057d214d1e Mon Sep 17 00:00:00 2001 From: "Yolanda M. Davis" Date: Thu, 11 Aug 2016 05:42:19 -0400 Subject: [PATCH] NIFI-2524 - Fixes to improve handling of missing journal files during rollover/merge execution.Includes: This closes #840 Removed partial file check (based on missing first file) Added condition to merge if at least one journal files available on disk. If all files are missing from disk that is considered an error. Added retry logic to prevent endless thread execution when encountering errors (such as missing files). --- .../PersistentProvenanceRepository.java | 66 +++++++++---------- .../TestPersistentProvenanceRepository.java | 57 ++++++++++++++++ 2 files changed, 89 insertions(+), 34 deletions(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index f8bb667c00..38722c53cf 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -125,6 +125,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov"); public static final int MAX_UNDELETED_QUERY_RESULTS = 10; public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file + public static final int MAX_JOURNAL_ROLLOVER_RETRIES = 5; private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class); @@ -1267,25 +1268,23 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { if (!journalsToMerge.isEmpty()) { // Run the rollover logic in a background thread. final AtomicReference> futureReference = new AtomicReference<>(); + final AtomicInteger retryAttempts = new AtomicInteger(MAX_JOURNAL_ROLLOVER_RETRIES); final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0); final Runnable rolloverRunnable = new Runnable() { @Override public void run() { + + File fileRolledOver = null; + try { - final File fileRolledOver; + fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter); + } catch (final IOException ioe) { + logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); + logger.error("", ioe); + } - try { - fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter); - } catch (final IOException ioe) { - logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); - logger.error("", ioe); - return; - } + if (fileRolledOver != null) { - if (fileRolledOver == null) { - logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); - return; - } final File file = fileRolledOver; // update our map of id to Path @@ -1302,9 +1301,18 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); + } + + //if files were rolled over or if out of retries stop the future + if(fileRolledOver != null || retryAttempts.decrementAndGet() == 0) { + + if(fileRolledOver== null && retryAttempts.get() == 0){ + logger.error("Failed to merge Journal Files {} after {} attempts. ",journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES); + } + rolloverCompletions.getAndIncrement(); - // We have finished successfully. Cancel the future so that we don't run anymore + // Cancel the future so that we don't run anymore Future future; while ((future = futureReference.get()) == null) { try { @@ -1312,17 +1320,16 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } catch (final InterruptedException ie) { } } - future.cancel(false); - } catch (final Throwable t) { - logger.error("Failed to rollover Provenance repository due to {}", t.toString()); - logger.error("", t); + + }else{ + logger.warn("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); } } }; // We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we - // fail for some reason. When we succeed, the Runnable will cancel itself. + // fail for some reason. When we succeed or if retries are exceeded, the Runnable will cancel itself. future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS); futureReference.set(future); } @@ -1398,7 +1405,6 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } } - // protected for use in unit tests protected Set recoverJournalFiles() throws IOException { if (!configuration.isAllowRollover()) { @@ -1467,6 +1473,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { return mergedFile; } + /** *

* Merges all of the given Journal Files into a single, merged Provenance Event Log File. As these records are merged, they will be compressed, if the repository is configured to compress records, @@ -1515,12 +1522,12 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } }); - final String firstJournalFile = journalFiles.get(0).getName(); - final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, "."); - final boolean allPartialFiles = firstFileSuffix.equals("0"); + //Search for any missing files. At this point they should have been written to disk otherwise cannot continue + //missing files is most likely due to incomplete cleanup of files post merge + final long numAvailableFiles = journalFiles.size() - journalFiles.stream().filter(file -> !file.exists()).count(); // check if we have all of the "partial" files for the journal. - if (allPartialFiles) { + if (numAvailableFiles > 0) { if (suggestedMergeFile.exists()) { // we have all "partial" files and there is already a merged file. Delete the data from the index // because the merge file may not be fully merged. We will re-merge. @@ -1552,16 +1559,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } } } else { - logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " - + "but it did not; assuming that the files were already merged but only some finished deletion " - + "before restart. Deleting remaining partial journal files.", journalFiles); - - for ( final File file : journalFiles ) { - if ( !file.delete() && file.exists() ) { - logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file); - } - } - + logger.warn("Cannot merge journal files {} because they do not exist on disk", journalFiles); return null; } @@ -1839,7 +1837,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } else { final long nanos = System.nanoTime() - startNanos; final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); - logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, suggestedMergeFile, millis); + logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", numAvailableFiles, records, suggestedMergeFile, millis); } return writerFile; diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 12f4a73dc1..ff12691adc 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -68,6 +68,7 @@ import java.io.FileFilter; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1668,6 +1669,62 @@ public class TestPersistentProvenanceRepository { assertEquals(10000, counter); } + @Test + public void testRolloverRetry() throws IOException, InterruptedException { + final AtomicInteger retryAmount = new AtomicInteger(0); + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileLife(3, TimeUnit.SECONDS); + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS){ + @Override + File mergeJournals(List journalFiles, File suggestedMergeFile, EventReporter eventReporter) throws IOException { + retryAmount.incrementAndGet(); + return super.mergeJournals(journalFiles, suggestedMergeFile, eventReporter); + } + }; + repo.initialize(getEventReporter(), null, null); + + final Map attributes = new HashMap<>(); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + final ProvenanceEventRecord record = builder.build(); + + final ExecutorService exec = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10000; i++) { + exec.submit(new Runnable() { + @Override + public void run() { + repo.registerEvent(record); + } + }); + } + + final File storageDir = config.getStorageDirectories().get(0); + //trigger retry through full file deletion + Arrays.asList(storageDir.listFiles()) + .stream() + .map(file -> new File(storageDir, "journals")) + .map(journalDir -> Arrays.asList(journalDir.listFiles())) + .flatMap(partials -> partials.stream()) + .filter(partial -> partial.exists()) + .forEach(file -> { + file.delete(); + }); + + repo.waitForRollover(); + + assertEquals(5,retryAmount.get()); + + } + @Test public void testTruncateAttributes() throws IOException, InterruptedException { final RepositoryConfiguration config = createConfiguration();