NIFI-803: Ensure that if an OutOfMemoryError occurs, the Provenance Repo won't become corrupt

This commit is contained in:
Mark Payne 2015-08-03 13:25:53 -04:00
parent 496ebfb3be
commit 6a0a321b64
5 changed files with 294 additions and 80 deletions

View File

@ -152,7 +152,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
private final AtomicInteger rolloverCompletions = new AtomicInteger(0);
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean repoDirty = new AtomicBoolean(false);
private final AtomicInteger dirtyWriterCount = new AtomicInteger(0);
// we keep the last 1000 records on hand so that when the UI is opened and it asks for the last 1000 records we don't need to
// read them. Since this is a very cheap operation to keep them, it's worth the tiny expense for the improved user experience.
private final RingBuffer<ProvenanceEventRecord> latestRecords = new RingBuffer<>(1000);
@ -338,7 +339,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return config;
}
static RecordWriter[] createWriters(final RepositoryConfiguration config, final long initialRecordId) throws IOException {
// protected in order to override for unit tests
protected RecordWriter[] createWriters(final RepositoryConfiguration config, final long initialRecordId) throws IOException {
final List<File> storageDirectories = config.getStorageDirectories();
final RecordWriter[] writers = new RecordWriter[config.getJournalCount()];
@ -561,13 +563,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
idGenerator.set(maxId + 1);
// TODO: Consider refactoring this so that we do the rollover actions at the same time that we merge the journals.
// This would require a few different changes:
// * Rollover Actions would take a ProvenanceEventRecord at a time, not a File at a time. Would have to be either
// constructed or "started" for each file and then closed after each file
// * The recovery would have to then read through all of the journals with the highest starting ID to determine
// the action max id instead of reading the merged file
// * We would write to a temporary file and then rename once the merge is complete. This allows us to fail and restart
try {
final Set<File> recoveredJournals = recoverJournalFiles();
filesToRecover.addAll(recoveredJournals);
@ -654,13 +649,6 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final long totalJournalSize;
readLock.lock();
try {
if (repoDirty.get()) {
logger.debug("Cannot persist provenance record because there was an IOException last time a record persistence was attempted. "
+ "Will not attempt to persist more records until the repo has been rolled over.");
return;
}
final RecordWriter[] recordWriters = this.writers;
long bytesWritten = 0L;
// obtain a lock on one of the RecordWriter's so that no other thread is able to write to this writer until we're finished.
@ -669,6 +657,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
boolean locked = false;
RecordWriter writer;
do {
final RecordWriter[] recordWriters = this.writers;
final int numDirty = dirtyWriterCount.get();
if (numDirty >= recordWriters.length) {
throw new IllegalStateException("Cannot update repository because all partitions are unusable at this time. Writing to the repository would cause corruption. "
+ "This most often happens as a result of the repository running out of disk space or the JMV running out of memory.");
}
final long idx = writerIndex.getAndIncrement();
writer = recordWriters[(int) (idx % recordWriters.length)];
locked = writer.tryLock();
@ -688,24 +683,31 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
totalJournalSize = bytesWrittenSinceRollover.addAndGet(bytesWritten);
recordsWrittenSinceRollover.getAndIncrement();
} catch (final IOException ioe) {
} catch (final Throwable t) {
// We need to set the repoDirty flag before we release the lock for this journal.
// Otherwise, another thread may write to this journal -- this is a problem because
// the journal contains part of our record but not all of it. Writing to the end of this
// journal will result in corruption!
repoDirty.set(true);
writer.markDirty();
dirtyWriterCount.incrementAndGet();
streamStartTime.set(0L); // force rollover to happen soon.
throw ioe;
throw t;
} finally {
writer.unlock();
}
} catch (final IOException ioe) {
logger.error("Failed to persist Provenance Event due to {}. Will not attempt to write to the Provenance Repository again until the repository has rolled over.", ioe.toString());
// warn about the failure
logger.error("Failed to persist Provenance Event due to {}.", ioe.toString());
logger.error("", ioe);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString() +
". Will not attempt to write to the Provenance Repository again until the repository has rolled over");
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to persist Provenance Event due to " + ioe.toString());
// Switch from readLock to writeLock so that we can perform rollover
// Attempt to perform a rollover. An IOException in this part of the code generally is the result of
// running out of disk space. If we have multiple partitions, we may well be able to rollover. This helps
// in two ways: it compresses the journal files which frees up space, and if it ends up merging to a different
// partition/storage directory, we can delete the journals from this directory that ran out of space.
// In order to do this, though, we must switch from a read lock to a write lock.
// This part of the code gets a little bit messy, and we could potentially refactor it a bit in order to
// make the code cleaner.
readLock.unlock();
try {
writeLock.lock();
@ -720,6 +722,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
logger.error("", e);
eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString());
} finally {
// we must re-lock the readLock, as the finally block below is going to unlock it.
readLock.lock();
}
@ -752,6 +755,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}
/**
* @return all of the Provenance Event Log Files (not the journals, the merged files) available across all storage directories.
*/
private List<File> getLogFiles() {
final List<File> files = new ArrayList<>();
for (final Path path : idToPathMap.get().values()) {
@ -817,6 +823,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}
// This comparator sorts the data based on the "basename" of the files. I.e., the numeric portion.
// We do this because the numeric portion represents the ID of the first event in the log file.
// As a result, we are sorting based on time, since the ID is monotonically increasing. By doing this,
// are able to avoid hitting disk continually to check timestamps
final Comparator<File> sortByBasenameComparator = new Comparator<File>() {
@Override
public int compare(final File o1, final File o2) {
@ -930,7 +940,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}
public void waitForRollover() throws IOException {
/**
* Blocks the calling thread until the repository rolls over. This is intended for unit testing.
*/
public void waitForRollover() {
final int count = rolloverCompletions.get();
while (rolloverCompletions.get() == count) {
try {
@ -940,6 +953,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
}
/**
* @return the number of journal files that exist across all storage directories
*/
// made protected for testing purposes
protected int getJournalCount() {
// determine how many 'journals' we have in the journals directories
@ -956,7 +972,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
/**
* MUST be called with the write lock held
* <p>
* MUST be called with the write lock held.
* </p>
*
* Rolls over the data in the journal files, merging them into a single Provenance Event Log File, and
* compressing and indexing as needed.
*
* @param force if true, will force a rollover regardless of whether or not data has been written
* @throws IOException if unable to complete rollover
@ -968,7 +989,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// If this is the first time we're creating the out stream, or if we
// have written something to the stream, then roll over
if (recordsWrittenSinceRollover.get() > 0L || repoDirty.get() || force) {
if (force || recordsWrittenSinceRollover.get() > 0L || dirtyWriterCount.get() > 0) {
final List<File> journalsToMerge = new ArrayList<>();
for (final RecordWriter writer : writers) {
final File writerFile = writer.getFile();
@ -986,10 +1007,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
}
// Choose a storage directory to store the merged file in.
final long storageDirIdx = storageDirectoryIndex.getAndIncrement();
final List<File> storageDirs = configuration.getStorageDirectories();
final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size()));
// Run the rollover logic in a background thread.
final AtomicReference<Future<?>> futureReference = new AtomicReference<>();
final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
final Runnable rolloverRunnable = new Runnable() {
@ -999,10 +1022,8 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final File fileRolledOver;
try {
fileRolledOver = mergeJournals(journalsToMerge, storageDir, getMergeFile(journalsToMerge, storageDir), eventReporter, latestRecords);
repoDirty.set(false);
fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
} catch (final IOException ioe) {
repoDirty.set(true);
logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
logger.error("", ioe);
return;
@ -1046,7 +1067,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
};
// We are going to schedule the future to run every 10 seconds. This allows us to keep retrying if we
// We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we
// fail for some reason. When we succeed, the Runnable will cancel itself.
final Future<?> future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
futureReference.set(future);
@ -1061,6 +1082,13 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final int journalCountThreshold = configuration.getJournalCount() * 5;
final long sizeThreshold = (long) (configuration.getMaxStorageCapacity() * 1.1D); // do not go over 10% of max capacity
// check if we need to apply backpressure.
// If we have too many journal files, or if the repo becomes too large, backpressure is necessary. Without it,
// if the rate at which provenance events are registered exceeds the rate at which we can compress/merge/index them,
// then eventually we will end up with all of the data stored in the 'journals' directory and not yet indexed. This
// would mean that the data would never even be accessible. In order to prevent this, if we exceeds 110% of the configured
// max capacity for the repo, or if we have 5 sets of journal files waiting to be merged, we will block here until
// that is no longer the case.
if (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
logger.warn("The rate of the dataflow is exceeding the provenance recording rate. "
+ "Slowing down flow to accomodate. Currently, there are {} journal files ({} bytes) and "
@ -1086,14 +1114,17 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
+ "journal files to be rolled over is {}", journalFileCount);
}
// we've finished rolling over successfully. Create new writers and reset state.
writers = createWriters(configuration, idGenerator.get());
dirtyWriterCount.set(0);
streamStartTime.set(System.currentTimeMillis());
recordsWrittenSinceRollover.getAndSet(0);
}
}
private Set<File> recoverJournalFiles() throws IOException {
// protected for use in unit tests
protected Set<File> recoverJournalFiles() throws IOException {
if (!configuration.isAllowRollover()) {
return Collections.emptySet();
}
@ -1133,7 +1164,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
for (final List<File> journalFileSet : journalMap.values()) {
final long storageDirIdx = storageDirectoryIndex.getAndIncrement();
final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size()));
final File mergedFile = mergeJournals(journalFileSet, storageDir, getMergeFile(journalFileSet, storageDir), eventReporter, latestRecords);
final File mergedFile = mergeJournals(journalFileSet, getMergeFile(journalFileSet, storageDir), eventReporter);
if (mergedFile != null) {
mergedFiles.add(mergedFile);
}
@ -1160,11 +1191,30 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return mergedFile;
}
File mergeJournals(final List<File> journalFiles, final File storageDir, final File mergedFile, final EventReporter eventReporter,
final RingBuffer<ProvenanceEventRecord> ringBuffer) throws IOException {
logger.debug("Merging {} to {}", journalFiles, mergedFile);
/**
* <p>
* Merges all of the given Journal Files into a single, merged Provenance Event Log File. As these records are merged, they will be compressed, if the repository is configured to compress records,
* and will be indexed.
* </p>
*
* <p>
* If the repository is configured to compress the data, the file written to may not be the same as the <code>suggestedMergeFile</code>, as a filename extension of '.gz' may be appended. If the
* journals are successfully merged, the file that they were merged into will be returned. If unable to merge the records (for instance, because the repository has been closed or because the list
* of journal files was empty), this method will return <code>null</code>.
* </p>
*
* @param journalFiles the journal files to merge
* @param suggestedMergeFile the file to write the merged records to
* @param eventReporter the event reporter to report any warnings or errors to; may be null.
*
* @return the file that the given journals were merged into, or <code>null</code> if no records were merged.
*
* @throws IOException if a problem occurs writing to the mergedFile, reading from a journal, or updating the Lucene Index.
*/
File mergeJournals(final List<File> journalFiles, final File suggestedMergeFile, final EventReporter eventReporter) throws IOException {
logger.debug("Merging {} to {}", journalFiles, suggestedMergeFile);
if ( this.closed ) {
logger.info("Provenance Repository has been closed; will not merge journal files to {}", mergedFile);
logger.info("Provenance Repository has been closed; will not merge journal files to {}", suggestedMergeFile);
return null;
}
@ -1194,7 +1244,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// check if we have all of the "partial" files for the journal.
if (allPartialFiles) {
if ( mergedFile.exists() ) {
if (suggestedMergeFile.exists()) {
// we have all "partial" files and there is already a merged file. Delete the data from the index
// because the merge file may not be fully merged. We will re-merge.
logger.warn("Merged Journal File {} already exists; however, all partial journal files also exist "
@ -1202,9 +1252,9 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final DeleteIndexAction deleteAction = new DeleteIndexAction(this, indexConfig, indexManager);
try {
deleteAction.execute(mergedFile);
deleteAction.execute(suggestedMergeFile);
} catch (final Exception e) {
logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", mergedFile, e.toString());
logger.warn("Failed to delete records from Journal File {} from the index; this could potentially result in duplicates. Failure was due to {}", suggestedMergeFile, e.toString());
if ( logger.isDebugEnabled() ) {
logger.warn("", e);
}
@ -1213,15 +1263,15 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
// Since we only store the file's basename, block offset, and event ID, and because the newly created file could end up on
// a different Storage Directory than the original, we need to ensure that we delete both the partially merged
// file and the TOC file. Otherwise, we could get the wrong copy and have issues retrieving events.
if ( !mergedFile.delete() ) {
if (!suggestedMergeFile.delete()) {
logger.error("Failed to delete partially written Provenance Journal File {}. This may result in events from this journal "
+ "file not being able to be displayed. This file should be deleted manually.", mergedFile);
+ "file not being able to be displayed. This file should be deleted manually.", suggestedMergeFile);
}
final File tocFile = TocUtil.getTocFile(mergedFile);
final File tocFile = TocUtil.getTocFile(suggestedMergeFile);
if ( tocFile.exists() && !tocFile.delete() ) {
logger.error("Failed to delete .toc file {}; this may result in not being able to read the Provenance Events from the {} Journal File. "
+ "This can be corrected by manually deleting the {} file", tocFile, mergedFile, tocFile);
+ "This can be corrected by manually deleting the {} file", tocFile, suggestedMergeFile, tocFile);
}
}
} else {
@ -1245,7 +1295,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
int records = 0;
final boolean isCompress = configuration.isCompressOnRollover();
final File writerFile = isCompress ? new File(mergedFile.getParentFile(), mergedFile.getName() + ".gz") : mergedFile;
final File writerFile = isCompress ? new File(suggestedMergeFile.getParentFile(), suggestedMergeFile.getName() + ".gz") : suggestedMergeFile;
try {
for (final File journalFile : journalFiles) {
@ -1293,9 +1343,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
logger.warn("", e);
}
if (eventReporter != null) {
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to read Provenance Event Record from Journal due to " + e +
"; it's possible that hte record wasn't completely written to the file. This record will be skipped.");
}
}
if (record == null) {
continue;
@ -1332,6 +1384,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
try {
long maxId = 0L;
try {
while (!recordToReaderMap.isEmpty()) {
final Map.Entry<StandardProvenanceEventRecord, RecordReader> entry = recordToReaderMap.entrySet().iterator().next();
final StandardProvenanceEventRecord record = entry.getKey();
@ -1361,8 +1414,12 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
recordToReaderMap.put(nextRecord, reader);
}
}
indexWriter.commit();
} catch (final Throwable t) {
indexWriter.rollback();
throw t;
}
indexConfig.setMaxIdIndexed(maxId);
} finally {
indexManager.returnIndexWriter(indexingDirectory, indexWriter);
@ -1370,10 +1427,11 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
// record should now be available in the repository. We can copy the values from latestRecords to ringBuffer.
final RingBuffer<ProvenanceEventRecord> latestRecordBuffer = this.latestRecords;
latestRecords.forEach(new ForEachEvaluator<ProvenanceEventRecord>() {
@Override
public boolean evaluate(final ProvenanceEventRecord event) {
ringBuffer.add(event);
latestRecordBuffer.add(event);
return true;
}
});
@ -1390,13 +1448,21 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
for (final File journalFile : journalFiles) {
if (!journalFile.delete() && journalFile.exists()) {
logger.warn("Failed to remove temporary journal file {}; this file should be cleaned up manually", journalFile.getAbsolutePath());
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " + journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
if (eventReporter != null) {
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal file " +
journalFile.getAbsolutePath() + "; this file should be cleaned up manually");
}
}
final File tocFile = TocUtil.getTocFile(journalFile);
if (!tocFile.delete() && tocFile.exists()) {
logger.warn("Failed to remove temporary journal TOC file {}; this file should be cleaned up manually", tocFile.getAbsolutePath());
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " + tocFile.getAbsolutePath() + "; this file should be cleaned up manually");
if (eventReporter != null) {
eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, "Failed to remove temporary journal TOC file " +
tocFile.getAbsolutePath() + "; this file should be cleaned up manually");
}
}
}
@ -1406,7 +1472,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} else {
final long nanos = System.nanoTime() - startNanos;
final long millis = TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, mergedFile, millis);
logger.info("Successfully merged {} journal files ({} records) into single Provenance Log File {} in {} milliseconds", journalFiles.size(), records, suggestedMergeFile, millis);
}
return writerFile;
@ -1850,7 +1916,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return true;
}
if (repoDirty.get() || writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis) {
if ((dirtyWriterCount.get() > 0) || (writtenSinceRollover > 0 && System.currentTimeMillis() > streamStartTime.get() + maxPartitionMillis)) {
return true;
}

View File

@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.zip.GZIPInputStream;
import org.apache.nifi.provenance.serialization.RecordReader;
@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
public class StandardRecordReader implements RecordReader {
private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
private static final Pattern UUID_PATTERN = Pattern.compile("[a-fA-F0-9]{8}\\-([a-fA-F0-9]{4}\\-){3}[a-fA-F0-9]{12}");
private final ByteCountingInputStream rawInputStream;
private final String filename;
@ -394,7 +396,11 @@ public class StandardRecordReader implements RecordReader {
// write less data. However, in version 8 we changed to just writing
// out the string because it's extremely expensive to call UUID.fromString.
// In the end, since we generally compress, the savings in minimal anyway.
return in.readUTF();
final String uuid = in.readUTF();
if (!UUID_PATTERN.matcher(uuid).matches()) {
throw new IOException("Failed to parse Provenance Event Record: expected a UUID but got: " + uuid);
}
return uuid;
}
}

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -44,6 +45,7 @@ public class StandardRecordWriter implements RecordWriter {
private final TocWriter tocWriter;
private final boolean compressed;
private final int uncompressedBlockSize;
private final AtomicBoolean dirtyFlag = new AtomicBoolean(false);
private DataOutputStream out;
private ByteCountingOutputStream byteCountingOut;
@ -65,11 +67,11 @@ public class StandardRecordWriter implements RecordWriter {
this.tocWriter = writer;
}
static void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
protected void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
out.writeUTF(uuid);
}
static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
protected void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
if (list == null) {
out.writeInt(0);
} else {
@ -241,7 +243,7 @@ public class StandardRecordWriter implements RecordWriter {
return byteCountingOut.getBytesWritten() - startBytes;
}
private void writeNullableString(final DataOutputStream out, final String toWrite) throws IOException {
protected void writeNullableString(final DataOutputStream out, final String toWrite) throws IOException {
if (toWrite == null) {
out.writeBoolean(false);
} else {
@ -304,7 +306,16 @@ public class StandardRecordWriter implements RecordWriter {
@Override
public boolean tryLock() {
return lock.tryLock();
final boolean obtainedLock = lock.tryLock();
if (obtainedLock && dirtyFlag.get()) {
// once we have obtained the lock, we need to check if the writer
// has been marked dirty. If so, we cannot write to the underlying
// file, so we need to unlock and return false. Otherwise, it's okay
// to write to the underlying file, so return true.
lock.unlock();
return false;
}
return obtainedLock;
}
@Override
@ -324,4 +335,9 @@ public class StandardRecordWriter implements RecordWriter {
public TocWriter getTocWriter() {
return tocWriter;
}
@Override
public void markDirty() {
dirtyFlag.set(true);
}
}

View File

@ -67,15 +67,23 @@ public interface RecordWriter extends Closeable {
/**
* Attempts to obtain a mutually exclusive lock for this Writer so that
* operations that must be atomic can be achieved atomically. If the lock is
* not immediately available, returns <code>false</code>; otherwise, obtains
* the lock and returns <code>true</code>.
* not immediately available, or if the writer is 'dirty' (see {@link #markDirty()},
* returns <code>false</code>; otherwise, obtains the lock and returns <code>true</code>.
*
* @return <code>true</code> if the lock was obtained, <code>false</code> otherwise.
*/
boolean tryLock();
/**
* Indicates that this Record Writer is 'dirty', meaning that it can no longer be
* updated. This can happen, for example, if a partial record is written. In this case,
* writing to this RecordWriter again could cause corruption.
*/
void markDirty();
/**
* Syncs the content written to this writer to disk.
*
* @throws IOException if unable to sync content to disk
*/
void sync() throws IOException;

View File

@ -28,10 +28,12 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@ -58,15 +60,21 @@ import org.apache.nifi.provenance.search.SearchTerms;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.provenance.serialization.RecordReader;
import org.apache.nifi.provenance.serialization.RecordReaders;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.stream.io.DataOutputStream;
import org.apache.nifi.util.file.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestPersistentProvenanceRepository {
@ -549,7 +557,12 @@ public class TestPersistentProvenanceRepository {
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
for (int i = 0; i < 10; i++) {
attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + String.valueOf(i + j * 10));
String uuidSuffix = String.valueOf(i + j * 10);
if (uuidSuffix.length() < 2) {
uuidSuffix = "0" + uuidSuffix;
}
attributes.put("uuid", "00000000-0000-0000-0000-0000000000" + uuidSuffix);
builder.fromFlowFile(createFlowFile(i + j * 10, 3000L, attributes));
repo.registerEvent(builder.build());
}
@ -1202,4 +1215,109 @@ public class TestPersistentProvenanceRepository {
assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars"));
}
@Test
public void testBehaviorOnOutOfMemory() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();
config.setMaxEventFileLife(3, TimeUnit.MINUTES);
config.setJournalCount(4);
// Create a repository that overrides the createWriters() method so that we can return writers that will throw
// OutOfMemoryError where we want to
final AtomicBoolean causeOOME = new AtomicBoolean(false);
repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
@Override
protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException {
final RecordWriter[] recordWriters = super.createWriters(config, initialRecordId);
// Spy on each of the writers so that a call to writeUUID throws an OutOfMemoryError if we set the
// causeOOME flag to true
final StandardRecordWriter[] spiedWriters = new StandardRecordWriter[recordWriters.length];
for (int i = 0; i < recordWriters.length; i++) {
final StandardRecordWriter writer = (StandardRecordWriter) recordWriters[i];
spiedWriters[i] = Mockito.spy(writer);
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
if (causeOOME.get()) {
throw new OutOfMemoryError();
} else {
writer.writeUUID(invocation.getArgumentAt(0, DataOutputStream.class), invocation.getArgumentAt(1, String.class));
}
return null;
}
}).when(spiedWriters[i]).writeUUID(Mockito.any(DataOutputStream.class), Mockito.any(String.class));
}
// return the writers that we are spying on
return spiedWriters;
}
};
repo.initialize(getEventReporter());
final Map<String, String> attributes = new HashMap<>();
attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
builder.setEventTime(System.currentTimeMillis());
builder.setEventType(ProvenanceEventType.RECEIVE);
builder.setTransitUri("nifi://unit-test");
attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
builder.setComponentId("1234");
builder.setComponentType("dummy processor");
// first make sure that we are able to write to the repo successfully.
for (int i = 0; i < 4; i++) {
final ProvenanceEventRecord record = builder.build();
repo.registerEvent(record);
}
// cause OOME to occur
causeOOME.set(true);
// write 4 times to make sure that we mark all partitions as dirty
for (int i = 0; i < 4; i++) {
final ProvenanceEventRecord record = builder.build();
try {
repo.registerEvent(record);
Assert.fail("Expected OutOfMmeoryError but was able to register event");
} catch (final OutOfMemoryError oome) {
}
}
// now that all partitions are dirty, ensure that as we keep trying to write, we get an IllegalStateException
// and that we don't corrupt the repository by writing partial records
for (int i = 0; i < 8; i++) {
final ProvenanceEventRecord record = builder.build();
try {
repo.registerEvent(record);
Assert.fail("Expected OutOfMmeoryError but was able to register event");
} catch (final IllegalStateException ise) {
}
}
// close repo so that we can create a new one to recover records
repo.close();
// make sure we can recover
final PersistentProvenanceRepository recoveryRepo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
@Override
protected Set<File> recoverJournalFiles() throws IOException {
try {
return super.recoverJournalFiles();
} catch (final IOException ioe) {
Assert.fail("Failed to recover properly");
return null;
}
}
};
try {
recoveryRepo.initialize(getEventReporter());
} finally {
recoveryRepo.close();
}
}
}