From 6a0a321b64568c70e417b78b2c52ab39af1c3d63 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Mon, 3 Aug 2015 13:25:53 -0400 Subject: [PATCH] NIFI-803: Ensure that if an OutOfMemoryError occurs, the Provenance Repo won't become corrupt --- .../PersistentProvenanceRepository.java | 210 ++++++++++++------ .../nifi/provenance/StandardRecordReader.java | 8 +- .../nifi/provenance/StandardRecordWriter.java | 24 +- .../serialization/RecordWriter.java | 12 +- .../TestPersistentProvenanceRepository.java | 120 +++++++++- 5 files changed, 294 insertions(+), 80 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java index 81d883a74e..a1063f0f0e 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java @@ -152,7 +152,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository private final AtomicInteger rolloverCompletions = new AtomicInteger(0); private final AtomicBoolean initialized = new AtomicBoolean(false); - private final AtomicBoolean repoDirty = new AtomicBoolean(false); + private final AtomicInteger dirtyWriterCount = new AtomicInteger(0); + // we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to // read them. Since this is a very cheap operation to keep them, it's worth the tiny expense for the improved user experience. private final RingBuffer latestRecords = new RingBuffer<>(1000); @@ -338,7 +339,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return config; } - static RecordWriter[] createWriters(final RepositoryConfiguration config, final long initialRecordId) throws IOException { + // protected in order to override for unit tests + protected RecordWriter[] createWriters(final RepositoryConfiguration config, final long initialRecordId) throws IOException { final List storageDirectories = config.getStorageDirectories(); final RecordWriter[] writers = new RecordWriter[config.getJournalCount()]; @@ -561,13 +563,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository idGenerator.set(maxId + 1); - // TODO: Consider refactoring this so that we do the rollover actions at the same time that we merge the journals. - // This would require a few different changes: - // * Rollover Actions would take a ProvenanceEventRecord at a time, not a File at a time. Would have to be either - // constructed or "started" for each file and then closed after each file - // * The recovery would have to then read through all of the journals with the highest starting ID to determine - // the action max id instead of reading the merged file - // * We would write to a temporary file and then rename once the merge is complete. This allows us to fail and restart try { final Set recoveredJournals = recoverJournalFiles(); filesToRecover.addAll(recoveredJournals); @@ -654,13 +649,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final long totalJournalSize; readLock.lock(); try { - if (repoDirty.get()) { - logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. " - + "Will not attempt to persist more records until the repo has been rolled over."); - return; - } - - final RecordWriter[] recordWriters = this.writers; long bytesWritten = 0L; // obtain a lock on one of the RecordWriter's so that no other thread is able to write to this writer until we're finished. @@ -669,6 +657,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository boolean locked = false; RecordWriter writer; do { + final RecordWriter[] recordWriters = this.writers; + final int numDirty = dirtyWriterCount.get(); + if (numDirty >= recordWriters.length) { + throw new IllegalStateException("Cannot update repository because all partitions are unusable at this time. Writing to the repository would cause corruption. " + + "This most often happens as a result of the repository running out of disk space or the JMV running out of memory."); + } + final long idx = writerIndex.getAndIncrement(); writer = recordWriters[(int) (idx % recordWriters.length)]; locked = writer.tryLock(); @@ -688,24 +683,31 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository totalJournalSize = bytesWrittenSinceRollover.addAndGet(bytesWritten); recordsWrittenSinceRollover.getAndIncrement(); - } catch (final IOException ioe) { + } catch (final Throwable t) { // We need to set the repoDirty flag before we release the lock for this journal. // Otherwise, another thread may write to this journal -- this is a problem because // the journal contains part of our record but not all of it. Writing to the end of this // journal will result in corruption! - repoDirty.set(true); + writer.markDirty(); + dirtyWriterCount.incrementAndGet(); streamStartTime.set(0L); // force rollover to happen soon. - throw ioe; + throw t; } finally { writer.unlock(); } } catch (final IOException ioe) { - logger.error("Failed to persist Provenance Event due to {}. Will not attempt to write to the Provenance Repository again until the repository has rolled over.", ioe.toString()); + // warn about the failure + logger.error("Failed to persist Provenance Event due to {}.", ioe.toString()); logger.error("", ioe); - eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() + - ". Will not attempt to write to the Provenance Repository again until the repository has rolled over"); + eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString()); - // Switch from readLock to writeLock so that we can perform rollover + // Attempt to perform a rollover. An IOException in this part of the code generally is the result of + // running out of disk space. If we have multiple partitions, we may well be able to rollover. This helps + // in two ways: it compresses the journal files which frees up space, and if it ends up merging to a different + // partition/storage directory, we can delete the journals from this directory that ran out of space. + // In order to do this, though, we must switch from a read lock to a write lock. + // This part of the code gets a little bit messy, and we could potentially refactor it a bit in order to + // make the code cleaner. readLock.unlock(); try { writeLock.lock(); @@ -720,6 +722,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository logger.error("", e); eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString()); } finally { + // we must re-lock the readLock, as the finally block below is going to unlock it. readLock.lock(); } @@ -752,6 +755,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } + /** + * @return all of the Provenance Event Log Files (not the journals, the merged files) available across all storage directories. + */ private List getLogFiles() { final List files = new ArrayList<>(); for (final Path path : idToPathMap.get().values()) { @@ -817,6 +823,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } + // This comparator sorts the data based on the "basename" of the files. I.e., the numeric portion. + // We do this because the numeric portion represents the ID of the first event in the log file. + // As a result, we are sorting based on time, since the ID is monotonically increasing. By doing this, + // are able to avoid hitting disk continually to check timestamps final Comparator sortByBasenameComparator = new Comparator() { @Override public int compare(final File o1, final File o2) { @@ -930,7 +940,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } - public void waitForRollover() throws IOException { + /** + * Blocks the calling thread until the repository rolls over. This is intended for unit testing. + */ + public void waitForRollover() { final int count = rolloverCompletions.get(); while (rolloverCompletions.get() == count) { try { @@ -940,6 +953,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } } + /** + * @return the number of journal files that exist across all storage directories + */ // made protected for testing purposes protected int getJournalCount() { // determine how many 'journals' we have in the journals directories @@ -956,7 +972,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } /** - * MUST be called with the write lock held + *

+ * MUST be called with the write lock held. + *

+ * + * Rolls over the data in the journal files, merging them into a single Provenance Event Log File, and + * compressing and indexing as needed. * * @param force if true, will force a rollover regardless of whether or not data has been written * @throws IOException if unable to complete rollover @@ -968,7 +989,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // If this is the first time we're creating the out stream, or if we // have written something to the stream, then roll over - if (recordsWrittenSinceRollover.get() > 0L || repoDirty.get() || force) { + if (force || recordsWrittenSinceRollover.get() > 0L || dirtyWriterCount.get() > 0) { final List journalsToMerge = new ArrayList<>(); for (final RecordWriter writer : writers) { final File writerFile = writer.getFile(); @@ -986,10 +1007,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), ".")); } + // Choose a storage directory to store the merged file in. final long storageDirIdx = storageDirectoryIndex.getAndIncrement(); final List storageDirs = configuration.getStorageDirectories(); final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size())); + // Run the rollover logic in a background thread. final AtomicReference> futureReference = new AtomicReference<>(); final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0); final Runnable rolloverRunnable = new Runnable() { @@ -999,10 +1022,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final File fileRolledOver; try { - fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords); - repoDirty.set(false); + fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter); } catch (final IOException ioe) { - repoDirty.set(true); logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString()); logger.error("", ioe); return; @@ -1046,7 +1067,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } }; - // We are going to schedule the future to run every 10 seconds. This allows us to keep retrying if we + // We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we // fail for some reason. When we succeed, the Runnable will cancel itself. final Future future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS); futureReference.set(future); @@ -1061,6 +1082,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final int journalCountThreshold = configuration.getJournalCount() * 5; final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max capacity + // check if we need to apply backpressure. + // If we have too many journal files, or if the repo becomes too large, backpressure is necessary. Without it, + // if the rate at which provenance events are registered exceeds the rate at which we can compress/merge/index them, + // then eventually we will end up with all of the data stored in the 'journals' directory and not yet indexed. This + // would mean that the data would never even be accessible. In order to prevent this, if we exceeds 110% of the configured + // max capacity for the repo, or if we have 5 sets of journal files waiting to be merged, we will block here until + // that is no longer the case. if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) { logger.warn("The rate of the dataflow is exceeding the provenance recording rate. " + "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and " @@ -1086,14 +1114,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository + "journal files to be rolled over is {}", journalFileCount); } + // we've finished rolling over successfully. Create new writers and reset state. writers = createWriters(configuration, idGenerator.get()); + dirtyWriterCount.set(0); streamStartTime.set(System.currentTimeMillis()); recordsWrittenSinceRollover.getAndSet(0); } } - private Set recoverJournalFiles() throws IOException { + // protected for use in unit tests + protected Set recoverJournalFiles() throws IOException { if (!configuration.isAllowRollover()) { return Collections.emptySet(); } @@ -1133,7 +1164,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository for (final List journalFileSet : journalMap.values()) { final long storageDirIdx = storageDirectoryIndex.getAndIncrement(); final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size())); - final File mergedFile = mergeJournals(journalFileSet, storageDir, getMergeFile(journalFileSet, storageDir), eventReporter, latestRecords); + final File mergedFile = mergeJournals(journalFileSet, getMergeFile(journalFileSet, storageDir), eventReporter); if (mergedFile != null) { mergedFiles.add(mergedFile); } @@ -1160,11 +1191,30 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return mergedFile; } - File mergeJournals(final List journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter, - final RingBuffer ringBuffer) throws IOException { - logger.debug("Merging {} to {}", journalFiles, mergedFile); + /** + *

+ * Merges all of the given Journal Files into a single, merged Provenance Event Log File. As these records are merged, they will be compressed, if the repository is configured to compress records, + * and will be indexed. + *

+ * + *

+ * If the repository is configured to compress the data, the file written to may not be the same as the suggestedMergeFile, as a filename extension of '.gz' may be appended. If the + * journals are successfully merged, the file that they were merged into will be returned. If unable to merge the records (for instance, because the repository has been closed or because the list + * of journal files was empty), this method will return null. + *

+ * + * @param journalFiles the journal files to merge + * @param suggestedMergeFile the file to write the merged records to + * @param eventReporter the event reporter to report any warnings or errors to; may be null. + * + * @return the file that the given journals were merged into, or null if no records were merged. + * + * @throws IOException if a problem occurs writing to the mergedFile, reading from a journal, or updating the Lucene Index. + */ + File mergeJournals(final List journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException { + logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile); if ( this.closed ) { - logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile); + logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile); return null; } @@ -1194,7 +1244,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // check if we have all of the "partial" files for the journal. if (allPartialFiles) { - if ( mergedFile.exists() ) { + if (suggestedMergeFile.exists()) { // we have all "partial" files and there is already a merged file. Delete the data from the index // because the merge file may not be fully merged. We will re-merge. logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist " @@ -1202,9 +1252,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager); try { - deleteAction.execute(mergedFile); + deleteAction.execute(suggestedMergeFile); } catch (final Exception e) { - logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString()); + logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", suggestedMergeFile, e.toString()); if ( logger.isDebugEnabled() ) { logger.warn("", e); } @@ -1213,15 +1263,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository // Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on // a different Storage Directory than the original, we need to ensure that we delete both the partially merged // file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events. - if ( !mergedFile.delete() ) { + if (!suggestedMergeFile.delete()) { logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal " - + "file not being able to be displayed. This file should be deleted manually.", mergedFile); + + "file not being able to be displayed. This file should be deleted manually.", suggestedMergeFile); } - final File tocFile = TocUtil.getTocFile(mergedFile); + final File tocFile = TocUtil.getTocFile(suggestedMergeFile); if ( tocFile.exists() && !tocFile.delete() ) { logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. " - + "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile); + + "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile); } } } else { @@ -1245,7 +1295,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository int records = 0; final boolean isCompress = configuration.isCompressOnRollover(); - final File writerFile = isCompress ? new File(mergedFile.getParentFile(), mergedFile.getName() + ".gz") : mergedFile; + final File writerFile = isCompress ? new File(suggestedMergeFile.getParentFile(), suggestedMergeFile.getName() + ".gz") : suggestedMergeFile; try { for (final File journalFile : journalFiles) { @@ -1293,8 +1343,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository logger.warn("", e); } - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e + "; it's possible that hte record wasn't completely written to the file. This record will be skipped."); + } } if (record == null) { @@ -1332,37 +1384,42 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository try { long maxId = 0L; - while (!recordToReaderMap.isEmpty()) { - final Map.Entry entry = recordToReaderMap.entrySet().iterator().next(); - final StandardProvenanceEventRecord record = entry.getKey(); - final RecordReader reader = entry.getValue(); + try { + while (!recordToReaderMap.isEmpty()) { + final Map.Entry entry = recordToReaderMap.entrySet().iterator().next(); + final StandardProvenanceEventRecord record = entry.getKey(); + final RecordReader reader = entry.getValue(); - writer.writeRecord(record, record.getEventId()); - final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); + writer.writeRecord(record, record.getEventId()); + final int blockIndex = writer.getTocWriter().getCurrentBlockIndex(); - indexingAction.index(record, indexWriter, blockIndex); - maxId = record.getEventId(); + indexingAction.index(record, indexWriter, blockIndex); + maxId = record.getEventId(); - latestRecords.add(truncateAttributes(record)); - records++; + latestRecords.add(truncateAttributes(record)); + records++; - // Remove this entry from the map - recordToReaderMap.remove(record); + // Remove this entry from the map + recordToReaderMap.remove(record); - // Get the next entry from this reader and add it to the map - StandardProvenanceEventRecord nextRecord = null; + // Get the next entry from this reader and add it to the map + StandardProvenanceEventRecord nextRecord = null; - try { - nextRecord = reader.nextRecord(); - } catch (final EOFException eof) { - } - - if (nextRecord != null) { - recordToReaderMap.put(nextRecord, reader); + try { + nextRecord = reader.nextRecord(); + } catch (final EOFException eof) { + } + + if (nextRecord != null) { + recordToReaderMap.put(nextRecord, reader); + } } + indexWriter.commit(); + } catch (final Throwable t) { + indexWriter.rollback(); + throw t; } - indexWriter.commit(); indexConfig.setMaxIdIndexed(maxId); } finally { indexManager.returnIndexWriter(indexingDirectory, indexWriter); @@ -1370,10 +1427,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } // record should now be available in the repository. We can copy the values from latestRecords to ringBuffer. + final RingBuffer latestRecordBuffer = this.latestRecords; latestRecords.forEach(new ForEachEvaluator() { @Override public boolean evaluate(final ProvenanceEventRecord event) { - ringBuffer.add(event); + latestRecordBuffer.add(event); return true; } }); @@ -1390,13 +1448,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository for (final File journalFile : journalFiles) { if (!journalFile.delete() && journalFile.exists()) { logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath()); - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); + + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + + journalFile.getAbsolutePath() + "; this file should be cleaned up manually"); + } } final File tocFile = TocUtil.getTocFile(journalFile); if (!tocFile.delete() && tocFile.exists()) { logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath()); - eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + tocFile.getAbsolutePath() + "; this file should be cleaned up manually"); + + if (eventReporter != null) { + eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + + tocFile.getAbsolutePath() + "; this file should be cleaned up manually"); + } } } @@ -1406,7 +1472,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository } else { final long nanos = System.nanoTime() - startNanos; final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS); - logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, mergedFile, millis); + logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, suggestedMergeFile, millis); } return writerFile; @@ -1850,7 +1916,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository return true; } - if (repoDirty.get() || writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis) { + if ((dirtyWriterCount.get() > 0) || (writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis)) { return true; } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java index 5221ebc3da..42bc8e99e6 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordReader.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.regex.Pattern; import java.util.zip.GZIPInputStream; import org.apache.nifi.provenance.serialization.RecordReader; @@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory; public class StandardRecordReader implements RecordReader { private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class); + private static final Pattern UUID_PATTERN = Pattern.compile("[a-fA-F0-9]{8}\\-([a-fA-F0-9]{4}\\-){3}[a-fA-F0-9]{12}"); private final ByteCountingInputStream rawInputStream; private final String filename; @@ -394,7 +396,11 @@ public class StandardRecordReader implements RecordReader { // write less data. However, in version 8 we changed to just writing // out the string because it's extremely expensive to call UUID.fromString. // In the end, since we generally compress, the savings in minimal anyway. - return in.readUTF(); + final String uuid = in.readUTF(); + if (!UUID_PATTERN.matcher(uuid).matches()) { + throw new IOException("Failed to parse Provenance Event Record: expected a UUID but got: " + uuid); + } + return uuid; } } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index 50caee1ef4..a8c0dd0d7b 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Collection; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -44,6 +45,7 @@ public class StandardRecordWriter implements RecordWriter { private final TocWriter tocWriter; private final boolean compressed; private final int uncompressedBlockSize; + private final AtomicBoolean dirtyFlag = new AtomicBoolean(false); private DataOutputStream out; private ByteCountingOutputStream byteCountingOut; @@ -65,11 +67,11 @@ public class StandardRecordWriter implements RecordWriter { this.tocWriter = writer; } - static void writeUUID(final DataOutputStream out, final String uuid) throws IOException { + protected void writeUUID(final DataOutputStream out, final String uuid) throws IOException { out.writeUTF(uuid); } - static void writeUUIDs(final DataOutputStream out, final Collection list) throws IOException { + protected void writeUUIDs(final DataOutputStream out, final Collection list) throws IOException { if (list == null) { out.writeInt(0); } else { @@ -241,7 +243,7 @@ public class StandardRecordWriter implements RecordWriter { return byteCountingOut.getBytesWritten() - startBytes; } - private void writeNullableString(final DataOutputStream out, final String toWrite) throws IOException { + protected void writeNullableString(final DataOutputStream out, final String toWrite) throws IOException { if (toWrite == null) { out.writeBoolean(false); } else { @@ -304,7 +306,16 @@ public class StandardRecordWriter implements RecordWriter { @Override public boolean tryLock() { - return lock.tryLock(); + final boolean obtainedLock = lock.tryLock(); + if (obtainedLock && dirtyFlag.get()) { + // once we have obtained the lock, we need to check if the writer + // has been marked dirty. If so, we cannot write to the underlying + // file, so we need to unlock and return false. Otherwise, it's okay + // to write to the underlying file, so return true. + lock.unlock(); + return false; + } + return obtainedLock; } @Override @@ -324,4 +335,9 @@ public class StandardRecordWriter implements RecordWriter { public TocWriter getTocWriter() { return tocWriter; } + + @Override + public void markDirty() { + dirtyFlag.set(true); + } } diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java index 7c9bcc0973..03f1ad0c6f 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java @@ -67,15 +67,23 @@ public interface RecordWriter extends Closeable { /** * Attempts to obtain a mutually exclusive lock for this Writer so that * operations that must be atomic can be achieved atomically. If the lock is - * not immediately available, returns false; otherwise, obtains - * the lock and returns true. + * not immediately available, or if the writer is 'dirty' (see {@link #markDirty()}, + * returns false; otherwise, obtains the lock and returns true. * * @return true if the lock was obtained, false otherwise. */ boolean tryLock(); + /** + * Indicates that this Record Writer is 'dirty', meaning that it can no longer be + * updated. This can happen, for example, if a partial record is written. In this case, + * writing to this RecordWriter again could cause corruption. + */ + void markDirty(); + /** * Syncs the content written to this writer to disk. + * * @throws IOException if unable to sync content to disk */ void sync() throws IOException; diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java index 3737588d38..713180f206 100644 --- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java +++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java @@ -28,10 +28,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -58,15 +60,21 @@ import org.apache.nifi.provenance.search.SearchTerms; import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.provenance.serialization.RecordReader; import org.apache.nifi.provenance.serialization.RecordReaders; +import org.apache.nifi.provenance.serialization.RecordWriter; import org.apache.nifi.reporting.Severity; +import org.apache.nifi.stream.io.DataOutputStream; import org.apache.nifi.util.file.FileUtils; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestPersistentProvenanceRepository { @@ -549,7 +557,12 @@ public class TestPersistentProvenanceRepository { builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); for (int i = 0; i < 10; i++) { - attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + String.valueOf(i + j * 10)); + String uuidSuffix = String.valueOf(i + j * 10); + if (uuidSuffix.length() < 2) { + uuidSuffix = "0" + uuidSuffix; + } + + attributes.put("uuid", "00000000-0000-0000-0000-0000000000" + uuidSuffix); builder.fromFlowFile(createFlowFile(i + j * 10, 3000L, attributes)); repo.registerEvent(builder.build()); } @@ -1202,4 +1215,109 @@ public class TestPersistentProvenanceRepository { assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars")); } + @Test + public void testBehaviorOnOutOfMemory() throws IOException, InterruptedException { + final RepositoryConfiguration config = createConfiguration(); + config.setMaxEventFileLife(3, TimeUnit.MINUTES); + config.setJournalCount(4); + + // Create a repository that overrides the createWriters() method so that we can return writers that will throw + // OutOfMemoryError where we want to + final AtomicBoolean causeOOME = new AtomicBoolean(false); + repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException { + final RecordWriter[] recordWriters = super.createWriters(config, initialRecordId); + + // Spy on each of the writers so that a call to writeUUID throws an OutOfMemoryError if we set the + // causeOOME flag to true + final StandardRecordWriter[] spiedWriters = new StandardRecordWriter[recordWriters.length]; + for (int i = 0; i < recordWriters.length; i++) { + final StandardRecordWriter writer = (StandardRecordWriter) recordWriters[i]; + + spiedWriters[i] = Mockito.spy(writer); + Mockito.doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + if (causeOOME.get()) { + throw new OutOfMemoryError(); + } else { + writer.writeUUID(invocation.getArgumentAt(0, DataOutputStream.class), invocation.getArgumentAt(1, String.class)); + } + return null; + } + }).when(spiedWriters[i]).writeUUID(Mockito.any(DataOutputStream.class), Mockito.any(String.class)); + } + + // return the writers that we are spying on + return spiedWriters; + } + }; + repo.initialize(getEventReporter()); + + final Map attributes = new HashMap<>(); + attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", "12345678-0000-0000-0000-012345678912"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + + // first make sure that we are able to write to the repo successfully. + for (int i = 0; i < 4; i++) { + final ProvenanceEventRecord record = builder.build(); + repo.registerEvent(record); + } + + // cause OOME to occur + causeOOME.set(true); + + // write 4 times to make sure that we mark all partitions as dirty + for (int i = 0; i < 4; i++) { + final ProvenanceEventRecord record = builder.build(); + try { + repo.registerEvent(record); + Assert.fail("Expected OutOfMmeoryError but was able to register event"); + } catch (final OutOfMemoryError oome) { + } + } + + // now that all partitions are dirty, ensure that as we keep trying to write, we get an IllegalStateException + // and that we don't corrupt the repository by writing partial records + for (int i = 0; i < 8; i++) { + final ProvenanceEventRecord record = builder.build(); + try { + repo.registerEvent(record); + Assert.fail("Expected OutOfMmeoryError but was able to register event"); + } catch (final IllegalStateException ise) { + } + } + + // close repo so that we can create a new one to recover records + repo.close(); + + // make sure we can recover + final PersistentProvenanceRepository recoveryRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) { + @Override + protected Set recoverJournalFiles() throws IOException { + try { + return super.recoverJournalFiles(); + } catch (final IOException ioe) { + Assert.fail("Failed to recover properly"); + return null; + } + } + }; + + try { + recoveryRepo.initialize(getEventReporter()); + } finally { + recoveryRepo.close(); + } + } + }