diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index f8bb667c00..38722c53cf 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -125,6 +125,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov"); public static final int MAX_UNDELETED_QUERY_RESULTS = 10; public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file + public static final int MAX_JOURNAL_ROLLOVER_RETRIES = 5; private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class); @@ -1267,25 +1268,23 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { if (!journalsToMerge.isEmpty()) { // Run the rollover logic in a background thread. final AtomicReference> futureReference = new AtomicReference<>(); + final AtomicInteger retryAttempts = new AtomicInteger(MAX_JOURNAL_ROLLOVER_RETRIES); final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0); final Runnable rolloverRunnable = new Runnable() { @Override public void run() { + + File fileRolledOver = null; + try { - final File fileRolledOver; + fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter); + } catch (final IOException ioe) { + logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); + logger.error("", ioe); + } - try { - fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter); - } catch (final IOException ioe) { - logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); - logger.error("", ioe); - return; - } + if (fileRolledOver != null) { - if (fileRolledOver == null) { - logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); - return; - } final File file = fileRolledOver; // update our map of id to Path @@ -1302,9 +1301,18 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); + } + + //if files were rolled over or if out of retries stop the future + if(fileRolledOver != null || retryAttempts.decrementAndGet() == 0) { + + if(fileRolledOver== null && retryAttempts.get() == 0){ + logger.error("Failed to merge Journal Files {} after {} attempts. ",journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES); + } + rolloverCompletions.getAndIncrement(); - // We have finished successfully. Cancel the future so that we don't run anymore + // Cancel the future so that we don't run anymore Future future; while ((future = futureReference.get()) == null) { try { @@ -1312,17 +1320,16 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } catch (final InterruptedException ie) { } } - future.cancel(false); - } catch (final Throwable t) { - logger.error("Failed to rollover Provenance repository due to {}", t.toString()); - logger.error("", t); + + }else{ + logger.warn("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir); } } }; // We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we - // fail for some reason. When we succeed, the Runnable will cancel itself. + // fail for some reason. When we succeed or if retries are exceeded, the Runnable will cancel itself. future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS); futureReference.set(future); } @@ -1398,7 +1405,6 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } } - // protected for use in unit tests protected Set recoverJournalFiles() throws IOException { if (!configuration.isAllowRollover()) { @@ -1467,6 +1473,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { return mergedFile; } + /** *

* Merges all of the given Journal Files into a single, merged Provenance Event Log File. As these records are merged, they will be compressed, if the repository is configured to compress records, @@ -1515,12 +1522,12 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } }); - final String firstJournalFile = journalFiles.get(0).getName(); - final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, "."); - final boolean allPartialFiles = firstFileSuffix.equals("0"); + //Search for any missing files. At this point they should have been written to disk otherwise cannot continue + //missing files is most likely due to incomplete cleanup of files post merge + final long numAvailableFiles = journalFiles.size() - journalFiles.stream().filter(file -> !file.exists()).count(); // check if we have all of the "partial" files for the journal. - if (allPartialFiles) { + if (numAvailableFiles > 0) { if (suggestedMergeFile.exists()) { // we have all "partial" files and there is already a merged file. Delete the data from the index // because the merge file may not be fully merged. We will re-merge. @@ -1552,16 +1559,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } } } else { - logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " - + "but it did not; assuming that the files were already merged but only some finished deletion " - + "before restart. Deleting remaining partial journal files.", journalFiles); - - for ( final File file : journalFiles ) { - if ( !file.delete() && file.exists() ) { - logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file); - } - } - + logger.warn("Cannot merge journal files {} because they do not exist on disk", journalFiles); return null; } @@ -1839,7 +1837,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository { } else { final long nanos = System.nanoTime() - startNanos; final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); - logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, suggestedMergeFile, millis); + logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", numAvailableFiles, records, suggestedMergeFile, millis); } return writerFile; diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 12f4a73dc1..ff12691adc 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -68,6 +68,7 @@ import java.io.FileFilter; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -1668,6 +1669,62 @@ public class TestPersistentProvenanceRepository { assertEquals(10000, counter); } + @Test + public void testRolloverRetry() throws IOException, InterruptedException { + final AtomicInteger retryAmount = new AtomicInteger(0); + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileLife(3, TimeUnit.SECONDS); + + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS){ + @Override + File mergeJournals(List journalFiles, File suggestedMergeFile, EventReporter eventReporter) throws IOException { + retryAmount.incrementAndGet(); + return super.mergeJournals(journalFiles, suggestedMergeFile, eventReporter); + } + }; + repo.initialize(getEventReporter(), null, null); + + final Map attributes = new HashMap<>(); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + final ProvenanceEventRecord record = builder.build(); + + final ExecutorService exec = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10000; i++) { + exec.submit(new Runnable() { + @Override + public void run() { + repo.registerEvent(record); + } + }); + } + + final File storageDir = config.getStorageDirectories().get(0); + //trigger retry through full file deletion + Arrays.asList(storageDir.listFiles()) + .stream() + .map(file -> new File(storageDir, "journals")) + .map(journalDir -> Arrays.asList(journalDir.listFiles())) + .flatMap(partials -> partials.stream()) + .filter(partial -> partial.exists()) + .forEach(file -> { + file.delete(); + }); + + repo.waitForRollover(); + + assertEquals(5,retryAmount.get()); + + } + @Test public void testTruncateAttributes() throws IOException, InterruptedException { final RepositoryConfiguration config = createConfiguration();