diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 0e08325531..3ba7e4eab7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -337,9 +337,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Collection repoRecords = checkpoint.records.values(); context.getFlowFileRepository().updateRepository((Collection) repoRecords); } catch (final IOException ioe) { - rollback(); + // if we fail to commit the session, we need to roll back + // the checkpoints as well because none of the checkpoints + // were ever committed. + rollback(false, true); throw new ProcessException("FlowFile Repository failed to update", ioe); } + final long flowFileRepoUpdateFinishNanos = System.nanoTime(); final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos; @@ -422,7 +426,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } catch (final Exception e) { try { - rollback(); + // if we fail to commit the session, we need to roll back + // the checkpoints as well because none of the checkpoints + // were ever committed. + rollback(false, true); } catch (final Exception e1) { e.addSuppressed(e1); } @@ -849,8 +856,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE @Override public void rollback(final boolean penalize) { + rollback(penalize, false); + } + + private void rollback(final boolean penalize, final boolean rollbackCheckpoint) { deleteOnCommit.clear(); - if (records.isEmpty()) { + + final Set recordsToHandle = new HashSet<>(); + recordsToHandle.addAll(records.values()); + if (rollbackCheckpoint) { + final Checkpoint existingCheckpoint = this.checkpoint; + this.checkpoint = null; + if (existingCheckpoint != null && existingCheckpoint.records != null) { + recordsToHandle.addAll(existingCheckpoint.records.values()); + } + } + + if (recordsToHandle.isEmpty()) { LOG.trace("{} was rolled back, but no events were performed by this ProcessSession", this); acknowledgeRecords(); return; @@ -859,14 +881,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE resetWriteClaims(); resetReadClaim(); - for (final StandardRepositoryRecord record : records.values()) { - // remove the working claim if it's different than the original. + for (final StandardRepositoryRecord record : recordsToHandle) { + // remove the working claims if they are different than the originals. removeTemporaryClaim(record); } final Set abortedRecords = new HashSet<>(); final Set transferRecords = new HashSet<>(); - for (final StandardRepositoryRecord record : records.values()) { + for (final StandardRepositoryRecord record : recordsToHandle) { if (record.isMarkedForAbort()) { removeContent(record.getWorkingClaim()); if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index ba3414805b..68aa389053 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -829,6 +829,106 @@ public class TestStandardProcessSession { } } + + @Test + public void testCommitFailureRequeuesFlowFiles() { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + + .contentClaimOffset(0L).size(0L).build(); + flowFileQueue.put(flowFileRecord); + + final FlowFile originalFlowFile = session.get(); + assertTrue(flowFileQueue.isActiveQueueEmpty()); + assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + + final FlowFile modified = session.write(originalFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("Hello".getBytes()); + } + }); + + session.transfer(modified); + + // instruct flowfile repo to throw IOException on update + flowFileRepo.setFailOnUpdate(true); + + try { + session.commit(); + Assert.fail("Session commit completed, even though FlowFile Repo threw IOException"); + } catch (final ProcessException pe) { + // expected behavior because FlowFile Repo will throw IOException + } + + assertFalse(flowFileQueue.isActiveQueueEmpty()); + assertEquals(1, flowFileQueue.size().getObjectCount()); + assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + } + + @Test + public void testRollbackAfterCheckpoint() { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L)) + + .contentClaimOffset(0L).size(0L).build(); + flowFileQueue.put(flowFileRecord); + + final FlowFile originalFlowFile = session.get(); + assertTrue(flowFileQueue.isActiveQueueEmpty()); + assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + + final FlowFile modified = session.write(originalFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("Hello".getBytes()); + } + }); + + session.transfer(modified); + + session.checkpoint(); + assertTrue(flowFileQueue.isActiveQueueEmpty()); + + session.rollback(); + assertTrue(flowFileQueue.isActiveQueueEmpty()); + assertEquals(0, flowFileQueue.size().getObjectCount()); + assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + + session.rollback(); + + flowFileQueue.put(flowFileRecord); + assertFalse(flowFileQueue.isActiveQueueEmpty()); + + final FlowFile originalRound2 = session.get(); + assertTrue(flowFileQueue.isActiveQueueEmpty()); + assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + + final FlowFile modifiedRound2 = session.write(originalRound2, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("Hello".getBytes()); + } + }); + + session.transfer(modifiedRound2); + + session.checkpoint(); + assertTrue(flowFileQueue.isActiveQueueEmpty()); + assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + + session.commit(); + + // FlowFile transferred back to queue + assertEquals(1, flowFileQueue.size().getObjectCount()); + assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount()); + assertFalse(flowFileQueue.isActiveQueueEmpty()); + } + @Test public void testCreateEmitted() throws IOException { final FlowFile newFlowFile = session.create(); @@ -910,9 +1010,13 @@ public class TestStandardProcessSession { } private static class MockFlowFileRepository implements FlowFileRepository { - + private boolean failOnUpdate = false; private final AtomicLong idGenerator = new AtomicLong(0L); + public void setFailOnUpdate(final boolean fail) { + this.failOnUpdate = fail; + } + @Override public void close() throws IOException { } @@ -929,6 +1033,9 @@ public class TestStandardProcessSession { @Override public void updateRepository(Collection records) throws IOException { + if (failOnUpdate) { + throw new IOException("FlowFile Repository told to fail on update for unit test"); + } } @Override