NIFI-893: Ensure that if session.commit fails, previously 'checkpointed' sessions are rolled back

This commit is contained in:
Mark Payne 2015-08-27 15:17:55 -04:00
parent 4baffacc42
commit fbec28bad9
2 changed files with 136 additions and 7 deletions

View File

@ -337,9 +337,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
} catch (final IOException ioe) {
rollback();
// if we fail to commit the session, we need to roll back
// the checkpoints as well because none of the checkpoints
// were ever committed.
rollback(false, true);
throw new ProcessException("FlowFile Repository failed to update", ioe);
}
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
@ -422,7 +426,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
} catch (final Exception e) {
try {
rollback();
// if we fail to commit the session, we need to roll back
// the checkpoints as well because none of the checkpoints
// were ever committed.
rollback(false, true);
} catch (final Exception e1) {
e.addSuppressed(e1);
}
@ -849,8 +856,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void rollback(final boolean penalize) {
rollback(penalize, false);
}
private void rollback(final boolean penalize, final boolean rollbackCheckpoint) {
deleteOnCommit.clear();
if (records.isEmpty()) {
final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>();
recordsToHandle.addAll(records.values());
if (rollbackCheckpoint) {
final Checkpoint existingCheckpoint = this.checkpoint;
this.checkpoint = null;
if (existingCheckpoint != null && existingCheckpoint.records != null) {
recordsToHandle.addAll(existingCheckpoint.records.values());
}
}
if (recordsToHandle.isEmpty()) {
LOG.trace("{} was rolled back, but no events were performed by this ProcessSession", this);
acknowledgeRecords();
return;
@ -859,14 +881,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
resetWriteClaims();
resetReadClaim();
for (final StandardRepositoryRecord record : records.values()) {
// remove the working claim if it's different than the original.
for (final StandardRepositoryRecord record : recordsToHandle) {
// remove the working claims if they are different than the originals.
removeTemporaryClaim(record);
}
final Set<RepositoryRecord> abortedRecords = new HashSet<>();
final Set<StandardRepositoryRecord> transferRecords = new HashSet<>();
for (final StandardRepositoryRecord record : records.values()) {
for (final StandardRepositoryRecord record : recordsToHandle) {
if (record.isMarkedForAbort()) {
removeContent(record.getWorkingClaim());
if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) {

View File

@ -829,6 +829,106 @@ public class TestStandardProcessSession {
}
}
@Test
public void testCommitFailureRequeuesFlowFiles() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
.contentClaimOffset(0L).size(0L).build();
flowFileQueue.put(flowFileRecord);
final FlowFile originalFlowFile = session.get();
assertTrue(flowFileQueue.isActiveQueueEmpty());
assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
final FlowFile modified = session.write(originalFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write("Hello".getBytes());
}
});
session.transfer(modified);
// instruct flowfile repo to throw IOException on update
flowFileRepo.setFailOnUpdate(true);
try {
session.commit();
Assert.fail("Session commit completed, even though FlowFile Repo threw IOException");
} catch (final ProcessException pe) {
// expected behavior because FlowFile Repo will throw IOException
}
assertFalse(flowFileQueue.isActiveQueueEmpty());
assertEquals(1, flowFileQueue.size().getObjectCount());
assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
}
@Test
public void testRollbackAfterCheckpoint() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
.contentClaimOffset(0L).size(0L).build();
flowFileQueue.put(flowFileRecord);
final FlowFile originalFlowFile = session.get();
assertTrue(flowFileQueue.isActiveQueueEmpty());
assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
final FlowFile modified = session.write(originalFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write("Hello".getBytes());
}
});
session.transfer(modified);
session.checkpoint();
assertTrue(flowFileQueue.isActiveQueueEmpty());
session.rollback();
assertTrue(flowFileQueue.isActiveQueueEmpty());
assertEquals(0, flowFileQueue.size().getObjectCount());
assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
session.rollback();
flowFileQueue.put(flowFileRecord);
assertFalse(flowFileQueue.isActiveQueueEmpty());
final FlowFile originalRound2 = session.get();
assertTrue(flowFileQueue.isActiveQueueEmpty());
assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
final FlowFile modifiedRound2 = session.write(originalRound2, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write("Hello".getBytes());
}
});
session.transfer(modifiedRound2);
session.checkpoint();
assertTrue(flowFileQueue.isActiveQueueEmpty());
assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
session.commit();
// FlowFile transferred back to queue
assertEquals(1, flowFileQueue.size().getObjectCount());
assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
assertFalse(flowFileQueue.isActiveQueueEmpty());
}
@Test
public void testCreateEmitted() throws IOException {
final FlowFile newFlowFile = session.create();
@ -910,9 +1010,13 @@ public class TestStandardProcessSession {
}
private static class MockFlowFileRepository implements FlowFileRepository {
private boolean failOnUpdate = false;
private final AtomicLong idGenerator = new AtomicLong(0L);
public void setFailOnUpdate(final boolean fail) {
this.failOnUpdate = fail;
}
@Override
public void close() throws IOException {
}
@ -929,6 +1033,9 @@ public class TestStandardProcessSession {
@Override
public void updateRepository(Collection<RepositoryRecord> records) throws IOException {
if (failOnUpdate) {
throw new IOException("FlowFile Repository told to fail on update for unit test");
}
}
@Override