diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java index 5a7656d378..f354f69be4 100644 --- a/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java +++ b/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java @@ -722,19 +722,41 @@ public final class MinimalLockingWriteAheadLog implements WriteAheadRepositor try { final DataOutputStream out = dataOut; if (out != null) { - out.close(); + try { + out.close(); + } catch (final IOException ioe) { + dataOut = null; + fileOut = null; + + blackList(); + throw ioe; + } } final Path editPath = getNewEditPath(); final FileOutputStream fos = new FileOutputStream(editPath.toFile()); - final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); - outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); - outStream.writeInt(writeAheadLogVersion); - outStream.writeUTF(serde.getClass().getName()); - outStream.writeInt(serde.getVersion()); - outStream.flush(); - dataOut = outStream; - fileOut = fos; + try { + final DataOutputStream outStream = new DataOutputStream(new BufferedOutputStream(fos)); + outStream.writeUTF(MinimalLockingWriteAheadLog.class.getName()); + outStream.writeInt(writeAheadLogVersion); + outStream.writeUTF(serde.getClass().getName()); + outStream.writeInt(serde.getVersion()); + outStream.flush(); + dataOut = outStream; + fileOut = fos; + } catch (final IOException ioe) { + logger.error("Failed to create new journal for {} due to {}", new Object[] {this, ioe.toString()}, ioe); + try { + fos.close(); + } catch (final IOException innerIOE) { + } + + dataOut = null; + fileOut = null; + blackList(); + + throw ioe; + } currentJournalFilename = editPath.toFile().getName(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index 7a85274af9..d4d595b8b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -159,7 +159,18 @@ public class ContinuallyRunProcessorTask implements Callable { } finally { try { if (batch) { - rawSession.commit(); + try { + rawSession.commit(); + } catch (final Exception e) { + final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor()); + procLog.error("Failed to commit session {} due to {}; rolling back", new Object[] { rawSession, e.toString() }, e); + + try { + rawSession.rollback(true); + } catch (final Exception e1) { + procLog.error("Failed to roll back session {} due to {}", new Object[] { rawSession, e.toString() }, e); + } + } } final long processingNanos = System.nanoTime() - startNanos; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java index 7d8bcece47..159f58f392 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java @@ -32,11 +32,14 @@ import org.apache.nifi.controller.scheduling.ProcessContextFactory; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.util.FormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This task runs through all Connectable Components and goes through its incoming queues, polling for FlowFiles and accepting none. This causes the desired side effect of expiring old FlowFiles. */ public class ExpireFlowFiles implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(ExpireFlowFiles.class); private final FlowController flowController; private final ProcessContextFactory contextFactory; @@ -49,7 +52,11 @@ public class ExpireFlowFiles implements Runnable { @Override public void run() { final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId()); - expireFlowFiles(rootGroup); + try { + expireFlowFiles(rootGroup); + } catch (final Exception e) { + logger.error("Failed to expire FlowFiles due to {}", e.toString(), e); + } } private StandardProcessSession createSession(final Connectable connectable) {