diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
index 1740f51521..2b7843a317 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/PersistentProvenanceRepository.java
@@ -240,6 +240,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} catch (final Exception e) {
logger.error("Failed to roll over Provenance Event Log due to {}", e.toString());
logger.error("", e);
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to roll over Provenance Event Log due to " + e.toString());
}
}
} finally {
@@ -730,7 +731,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} catch (final Exception e) {
logger.error("Failed to Rollover Provenance Event Repository file due to {}", e.toString());
logger.error("", e);
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString());
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Log due to " + e.toString());
} finally {
// we must re-lock the readLock, as the finally block below is going to unlock it.
readLock.lock();
@@ -756,7 +757,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
} catch (final IOException e) {
logger.error("Failed to Rollover Provenance Event Repository file due to {}", e.toString());
logger.error("", e);
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Repository file due to " + e.toString());
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to Rollover Provenance Event Log due to " + e.toString());
}
}
} finally {
@@ -1142,6 +1143,22 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
return journalFileCount;
}
+
+ /**
+ * Method is exposed for unit testing
+ *
+ * @param force whether or not to force a rollover.
+ * @throws IOException if unable to complete rollover
+ */
+ void rolloverWithLock(final boolean force) throws IOException {
+ writeLock.lock();
+ try {
+ rollover(force);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
/**
*
* MUST be called with the write lock held.
@@ -1163,19 +1180,26 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
if (force || recordsWrittenSinceRollover.get() > 0L || dirtyWriterCount.get() > 0) {
final List journalsToMerge = new ArrayList<>();
for (final RecordWriter writer : writers) {
- final File writerFile = writer.getFile();
- journalsToMerge.add(writerFile);
- try {
- writer.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close {} due to {}", writer, ioe.toString());
- if ( logger.isDebugEnabled() ) {
- logger.warn("", ioe);
+ if (!writer.isClosed()) {
+ final File writerFile = writer.getFile();
+ journalsToMerge.add(writerFile);
+ try {
+ writer.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close {} due to {}", writer, ioe.toString());
+ if (logger.isDebugEnabled()) {
+ logger.warn("", ioe);
+ }
}
}
}
- if ( logger.isDebugEnabled() ) {
- logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
+
+ if (logger.isDebugEnabled()) {
+ if (journalsToMerge.isEmpty()) {
+ logger.debug("No journals to merge; all RecordWriters were already closed");
+ } else {
+ logger.debug("Going to merge {} files for journals starting with ID {}", journalsToMerge.size(), LuceneUtil.substringBefore(journalsToMerge.get(0).getName(), "."));
+ }
}
// Choose a storage directory to store the merged file in.
@@ -1183,66 +1207,69 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
final List storageDirs = configuration.getStorageDirectories();
final File storageDir = storageDirs.get((int) (storageDirIdx % storageDirs.size()));
- // Run the rollover logic in a background thread.
- final AtomicReference> futureReference = new AtomicReference<>();
- final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
- final Runnable rolloverRunnable = new Runnable() {
- @Override
- public void run() {
- try {
- final File fileRolledOver;
-
+ Future> future = null;
+ if (!journalsToMerge.isEmpty()) {
+ // Run the rollover logic in a background thread.
+ final AtomicReference> futureReference = new AtomicReference<>();
+ final int recordsWritten = recordsWrittenSinceRollover.getAndSet(0);
+ final Runnable rolloverRunnable = new Runnable() {
+ @Override
+ public void run() {
try {
- fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
- } catch (final IOException ioe) {
- logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
- logger.error("", ioe);
- return;
- }
+ final File fileRolledOver;
- if (fileRolledOver == null) {
- logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
- return;
- }
- final File file = fileRolledOver;
-
- // update our map of id to Path
- // We need to make sure that another thread doesn't also update the map at the same time. We cannot
- // use the write lock when purging old events, and we want to use the same approach here.
- boolean updated = false;
- final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
- while (!updated) {
- final SortedMap existingPathMap = idToPathMap.get();
- final SortedMap newIdToPathMap = new TreeMap<>(new PathMapComparator());
- newIdToPathMap.putAll(existingPathMap);
- newIdToPathMap.put(fileFirstEventId, file.toPath());
- updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
- }
-
- logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
- rolloverCompletions.getAndIncrement();
-
- // We have finished successfully. Cancel the future so that we don't run anymore
- Future> future;
- while ((future = futureReference.get()) == null) {
try {
- Thread.sleep(10L);
- } catch (final InterruptedException ie) {
+ fileRolledOver = mergeJournals(journalsToMerge, getMergeFile(journalsToMerge, storageDir), eventReporter);
+ } catch (final IOException ioe) {
+ logger.error("Failed to merge Journal Files {} into a Provenance Log File due to {}", journalsToMerge, ioe.toString());
+ logger.error("", ioe);
+ return;
}
+
+ if (fileRolledOver == null) {
+ logger.debug("Couldn't merge journals. Will try again in 10 seconds. journalsToMerge: {}, storageDir: {}", journalsToMerge, storageDir);
+ return;
+ }
+ final File file = fileRolledOver;
+
+ // update our map of id to Path
+ // We need to make sure that another thread doesn't also update the map at the same time. We cannot
+ // use the write lock when purging old events, and we want to use the same approach here.
+ boolean updated = false;
+ final Long fileFirstEventId = Long.valueOf(LuceneUtil.substringBefore(fileRolledOver.getName(), "."));
+ while (!updated) {
+ final SortedMap existingPathMap = idToPathMap.get();
+ final SortedMap newIdToPathMap = new TreeMap<>(new PathMapComparator());
+ newIdToPathMap.putAll(existingPathMap);
+ newIdToPathMap.put(fileFirstEventId, file.toPath());
+ updated = idToPathMap.compareAndSet(existingPathMap, newIdToPathMap);
+ }
+
+ logger.info("Successfully Rolled over Provenance Event file containing {} records", recordsWritten);
+ rolloverCompletions.getAndIncrement();
+
+ // We have finished successfully. Cancel the future so that we don't run anymore
+ Future> future;
+ while ((future = futureReference.get()) == null) {
+ try {
+ Thread.sleep(10L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+
+ future.cancel(false);
+ } catch (final Throwable t) {
+ logger.error("Failed to rollover Provenance repository due to {}", t.toString());
+ logger.error("", t);
}
-
- future.cancel(false);
- } catch (final Throwable t) {
- logger.error("Failed to rollover Provenance repository due to {}", t.toString());
- logger.error("", t);
}
- }
- };
+ };
- // We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we
- // fail for some reason. When we succeed, the Runnable will cancel itself.
- final Future> future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
- futureReference.set(future);
+ // We are going to schedule the future to run immediately and then repeat every 10 seconds. This allows us to keep retrying if we
+ // fail for some reason. When we succeed, the Runnable will cancel itself.
+ future = rolloverExecutor.scheduleWithFixedDelay(rolloverRunnable, 0, 10, TimeUnit.SECONDS);
+ futureReference.set(future);
+ }
streamStartTime.set(System.currentTimeMillis());
bytesWrittenSinceRollover.set(0);
@@ -1271,7 +1298,10 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
while (journalFileCount > journalCountThreshold || repoSize > sizeThreshold) {
// if a shutdown happens while we are in this loop, kill the rollover thread and break
if (this.closed.get()) {
- future.cancel(true);
+ if (future != null) {
+ future.cancel(true);
+ }
+
break;
}
@@ -1504,7 +1534,7 @@ public class PersistentProvenanceRepository implements ProvenanceEventRepository
}
if (eventReporter != null) {
- eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "Failed to merge Journal Files due to " + ioe.toString());
+ eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, "re " + ioe.toString());
}
}
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index 981301e6e7..a5c121abda 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -51,6 +51,7 @@ public class StandardRecordWriter implements RecordWriter {
private ByteCountingOutputStream byteCountingOut;
private long lastBlockOffset = 0L;
private int recordCount = 0;
+ private volatile boolean closed = false;
private final Lock lock = new ReentrantLock();
@@ -295,6 +296,8 @@ public class StandardRecordWriter implements RecordWriter {
@Override
public synchronized void close() throws IOException {
+ closed = true;
+
logger.trace("Closing Record Writer for {}", file.getName());
lock();
@@ -330,7 +333,11 @@ public class StandardRecordWriter implements RecordWriter {
} finally {
unlock();
}
+ }
+ @Override
+ public boolean isClosed() {
+ return closed;
}
@Override
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index 03f1ad0c6f..b157ccc9cb 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -92,4 +92,9 @@ public interface RecordWriter extends Closeable {
* @return the TOC Writer that is being used to write the Table of Contents for this journal
*/
TocWriter getTocWriter();
+
+ /**
+ * @return true
if this Writer has been closed via the {@link #close()} method, false
otherwise
+ */
+ boolean isClosed();
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
index 02b9216604..4a5c08c4a6 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestPersistentProvenanceRepository.java
@@ -27,6 +27,7 @@ import java.io.FileFilter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -89,6 +90,8 @@ public class TestPersistentProvenanceRepository {
private RepositoryConfiguration config;
public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
+ private EventReporter eventReporter;
+ private List reportedEvents = Collections.synchronizedList(new ArrayList());
private RepositoryConfiguration createConfiguration() {
config = new RepositoryConfiguration();
@@ -107,6 +110,17 @@ public class TestPersistentProvenanceRepository {
@Before
public void printTestName() {
System.out.println("\n\n\n*********************** " + name.getMethodName() + " *****************************");
+
+ reportedEvents.clear();
+ eventReporter = new EventReporter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reportEvent(Severity severity, String category, String message) {
+ reportedEvents.add(new ReportedEvent(severity, category, message));
+ System.out.println(severity + " : " + category + " : " + message);
+ }
+ };
}
@After
@@ -146,14 +160,7 @@ public class TestPersistentProvenanceRepository {
private EventReporter getEventReporter() {
- return new EventReporter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reportEvent(Severity severity, String category, String message) {
- System.out.println(severity + " : " + category + " : " + message);
- }
- };
+ return eventReporter;
}
@Test
@@ -1238,6 +1245,68 @@ public class TestPersistentProvenanceRepository {
assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars"));
}
+
+ @Test
+ public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxAttributeChars(50);
+ config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+
+ // Create a repo that will allow only a single writer to be created.
+ final IOException failure = new IOException("Already created writers once. Unit test causing failure.");
+ repo = new PersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+ int iterations = 0;
+
+ @Override
+ protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException {
+ if (iterations++ == 1) {
+ throw failure;
+ } else {
+ return super.createWriters(config, initialRecordId);
+ }
+ }
+ };
+
+ // initialize with our event reporter
+ repo.initialize(getEventReporter());
+
+ // create some events in the journal files.
+ final Map attributes = new HashMap<>();
+ attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ for (int i = 0; i < 50; i++) {
+ final ProvenanceEventRecord event = builder.build();
+ repo.registerEvent(event);
+ }
+
+ // Attempt to rollover but fail to create new writers.
+ try {
+ repo.rolloverWithLock(true);
+ Assert.fail("Expected to get IOException when calling rolloverWithLock");
+ } catch (final IOException ioe) {
+ assertTrue(ioe == failure);
+ }
+
+ // Wait for the first rollover to succeed.
+ repo.waitForRollover();
+
+ // This time when we rollover, we should not have a problem rolling over.
+ repo.rolloverWithLock(true);
+
+ // Ensure that no errors were reported.
+ assertEquals(0, reportedEvents.size());
+ }
+
+
@Test
public void testBehaviorOnOutOfMemory() throws IOException, InterruptedException {
final RepositoryConfiguration config = createConfiguration();
@@ -1343,4 +1412,28 @@ public class TestPersistentProvenanceRepository {
}
}
+
+ private static class ReportedEvent {
+ private final Severity severity;
+ private final String category;
+ private final String message;
+
+ public ReportedEvent(final Severity severity, final String category, final String message) {
+ this.severity = severity;
+ this.category = category;
+ this.message = message;
+ }
+
+ public String getCategory() {
+ return category;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public Severity getSeverity() {
+ return severity;
+ }
+ }
}