NIFI-2524 - Fixes to improve handling of missing journal files during rollover/merge execution.Includes:

This closes #840

 Removed partial file check (based on missing first file)
 Added condition to merge if at least one journal files available on disk. If all files are  missing from disk that is considered an error.
 Added retry logic to prevent endless thread execution when encountering errors (such as missing files).
This commit is contained in:
Yolanda M. Davis 2016-08-11 05:42:19 -04:00 committed by Oleg Zhurakousky
parent 44057f097a
commit fbd3201157
2 changed files with 89 additions and 34 deletions

View File

@ -125,6 +125,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov"); public static final Pattern LOG_FILENAME_PATTERN = Pattern.compile("(\\d+).*\\.prov");
public static final int MAX_UNDELETED_QUERY_RESULTS = 10; public static final int MAX_UNDELETED_QUERY_RESULTS = 10;
public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file public static final int MAX_INDEXING_FAILURE_COUNT = 5; // how many indexing failures we will tolerate before skipping indexing for a prov file
public static final int MAX_JOURNAL_ROLLOVER_RETRIES = 5;
private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class); private static final Logger logger = LoggerFactory.getLogger(PersistentProvenanceRepository.class);
@ -1267,25 +1268,23 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
if (!journalsToMerge.isEmpty()) { if (!journalsToMerge.isEmpty()) {
// Run the rollover logic in a background thread. // Run the rollover logic in a background thread.
final AtomicReference<Future<?>> futureReference = new AtomicReference<>(); final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
final AtomicInteger retryAttempts = new AtomicInteger(MAX_JOURNAL_ROLLOVER_RETRIES);
final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0); final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
final Runnable rolloverRunnable = new Runnable() { final Runnable rolloverRunnable = new Runnable() {
@Override @Override
public void run() { public void run() {
File fileRolledOver = null;
try { try {
final File fileRolledOver; fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
} catch (final IOException ioe) {
logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
logger.error("", ioe);
}
try { if (fileRolledOver != null) {
fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
} catch (final IOException ioe) {
logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
logger.error("", ioe);
return;
}
if (fileRolledOver == null) {
logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
return;
}
final File file = fileRolledOver; final File file = fileRolledOver;
// update our map of id to Path // update our map of id to Path
@ -1302,9 +1301,18 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
} }
logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten); logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
}
//if files were rolled over or if out of retries stop the future
if(fileRolledOver != null || retryAttempts.decrementAndGet() == 0) {
if(fileRolledOver== null && retryAttempts.get() == 0){
logger.error("Failed to merge Journal Files {} after {} attempts. ",journalsToMerge, MAX_JOURNAL_ROLLOVER_RETRIES);
}
rolloverCompletions.getAndIncrement(); rolloverCompletions.getAndIncrement();
// We have finished successfully. Cancel the future so that we don't run anymore // Cancel the future so that we don't run anymore
Future<?> future; Future<?> future;
while ((future = futureReference.get()) == null) { while ((future = futureReference.get()) == null) {
try { try {
@ -1312,17 +1320,16 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
} catch (final InterruptedException ie) { } catch (final InterruptedException ie) {
} }
} }
future.cancel(false); future.cancel(false);
} catch (final Throwable t) {
logger.error("Failed to rollover Provenance repository due to {}", t.toString()); }else{
logger.error("", t); logger.warn("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
} }
} }
}; };
// We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we // We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we
// fail for some reason. When we succeed, the Runnable will cancel itself. // fail for some reason. When we succeed or if retries are exceeded, the Runnable will cancel itself.
future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS); future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
futureReference.set(future); futureReference.set(future);
} }
@ -1398,7 +1405,6 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
} }
} }
// protected for use in unit tests // protected for use in unit tests
protected Set<File> recoverJournalFiles() throws IOException { protected Set<File> recoverJournalFiles() throws IOException {
if (!configuration.isAllowRollover()) { if (!configuration.isAllowRollover()) {
@ -1467,6 +1473,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
return mergedFile; return mergedFile;
} }
/** /**
* <p> * <p>
* Merges all of the given Journal Files into a single, merged Provenance Event Log File. As these records are merged, they will be compressed, if the repository is configured to compress records, * Merges all of the given Journal Files into a single, merged Provenance Event Log File. As these records are merged, they will be compressed, if the repository is configured to compress records,
@ -1515,12 +1522,12 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
} }
}); });
final String firstJournalFile = journalFiles.get(0).getName(); //Search for any missing files. At this point they should have been written to disk otherwise cannot continue
final String firstFileSuffix = LuceneUtil.substringAfterLast(firstJournalFile, "."); //missing files is most likely due to incomplete cleanup of files post merge
final boolean allPartialFiles = firstFileSuffix.equals("0"); final long numAvailableFiles = journalFiles.size() - journalFiles.stream().filter(file -> !file.exists()).count();
// check if we have all of the "partial" files for the journal. // check if we have all of the "partial" files for the journal.
if (allPartialFiles) { if (numAvailableFiles > 0) {
if (suggestedMergeFile.exists()) { if (suggestedMergeFile.exists()) {
// we have all "partial" files and there is already a merged file. Delete the data from the index // we have all "partial" files and there is already a merged file. Delete the data from the index
// because the merge file may not be fully merged. We will re-merge. // because the merge file may not be fully merged. We will re-merge.
@ -1552,16 +1559,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
} }
} }
} else { } else {
logger.warn("Cannot merge journal files {} because expected first file to end with extension '.0' " logger.warn("Cannot merge journal files {} because they do not exist on disk", journalFiles);
+ "but it did not; assuming that the files were already merged but only some finished deletion "
+ "before restart. Deleting remaining partial journal files.", journalFiles);
for ( final File file : journalFiles ) {
if ( !file.delete() && file.exists() ) {
logger.warn("Failed to delete unneeded journal file {}; this file should be cleaned up manually", file);
}
}
return null; return null;
} }
@ -1839,7 +1837,7 @@ public class PersistentProvenanceRepository implements ProvenanceRepository {
} else { } else {
final long nanos = System.nanoTime() - startNanos; final long nanos = System.nanoTime() - startNanos;
final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, suggestedMergeFile, millis); logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", numAvailableFiles, records, suggestedMergeFile, millis);
} }
return writerFile; return writerFile;

View File

@ -68,6 +68,7 @@ import java.io.FileFilter;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -1668,6 +1669,62 @@ public class TestPersistentProvenanceRepository {
assertEquals(10000, counter); assertEquals(10000, counter);
} }
@Test
public void testRolloverRetry() throws IOException, InterruptedException {
final AtomicInteger retryAmount = new AtomicInteger(0);
final RepositoryConfiguration config = createConfiguration();
config.setMaxEventFileLife(3, TimeUnit.SECONDS);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS){
@Override
File mergeJournals(List<File> journalFiles, File suggestedMergeFile, EventReporter eventReporter) throws IOException {
retryAmount.incrementAndGet();
return super.mergeJournals(journalFiles, suggestedMergeFile, eventReporter);
}
};
repo.initialize(getEventReporter(), null, null);
final Map<String, String> attributes = new HashMap<>();
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
final ProvenanceEventRecord record = builder.build();
final ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10000; i++) {
exec.submit(new Runnable() {
@Override
public void run() {
repo.registerEvent(record);
}
});
}
final File storageDir = config.getStorageDirectories().get(0);
//trigger retry through full file deletion
Arrays.asList(storageDir.listFiles())
.stream()
.map(file -> new File(storageDir, "journals"))
.map(journalDir -> Arrays.asList(journalDir.listFiles()))
.flatMap(partials -> partials.stream())
.filter(partial -> partial.exists())
.forEach(file -> {
file.delete();
});
repo.waitForRollover();
assertEquals(5,retryAmount.get());
}
@Test @Test
public void testTruncateAttributes() throws IOException, InterruptedException { public void testTruncateAttributes() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration(); final RepositoryConfiguration config = createConfiguration();