mirror of https://github.com/apache/nifi.git
Merge branch 'NIFI-893'
This commit is contained in:
commit
76b5b38cc4
|
@ -337,9 +337,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
final Collection<StandardRepositoryRecord> repoRecords = checkpoint.records.values();
|
||||
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
|
||||
} catch (final IOException ioe) {
|
||||
rollback();
|
||||
// if we fail to commit the session, we need to roll back
|
||||
// the checkpoints as well because none of the checkpoints
|
||||
// were ever committed.
|
||||
rollback(false, true);
|
||||
throw new ProcessException("FlowFile Repository failed to update", ioe);
|
||||
}
|
||||
|
||||
final long flowFileRepoUpdateFinishNanos = System.nanoTime();
|
||||
final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
|
||||
|
||||
|
@ -422,7 +426,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
}
|
||||
} catch (final Exception e) {
|
||||
try {
|
||||
rollback();
|
||||
// if we fail to commit the session, we need to roll back
|
||||
// the checkpoints as well because none of the checkpoints
|
||||
// were ever committed.
|
||||
rollback(false, true);
|
||||
} catch (final Exception e1) {
|
||||
e.addSuppressed(e1);
|
||||
}
|
||||
|
@ -849,8 +856,23 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
|
||||
@Override
|
||||
public void rollback(final boolean penalize) {
|
||||
rollback(penalize, false);
|
||||
}
|
||||
|
||||
private void rollback(final boolean penalize, final boolean rollbackCheckpoint) {
|
||||
deleteOnCommit.clear();
|
||||
if (records.isEmpty()) {
|
||||
|
||||
final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>();
|
||||
recordsToHandle.addAll(records.values());
|
||||
if (rollbackCheckpoint) {
|
||||
final Checkpoint existingCheckpoint = this.checkpoint;
|
||||
this.checkpoint = null;
|
||||
if (existingCheckpoint != null && existingCheckpoint.records != null) {
|
||||
recordsToHandle.addAll(existingCheckpoint.records.values());
|
||||
}
|
||||
}
|
||||
|
||||
if (recordsToHandle.isEmpty()) {
|
||||
LOG.trace("{} was rolled back, but no events were performed by this ProcessSession", this);
|
||||
acknowledgeRecords();
|
||||
return;
|
||||
|
@ -859,14 +881,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
|
|||
resetWriteClaims();
|
||||
resetReadClaim();
|
||||
|
||||
for (final StandardRepositoryRecord record : records.values()) {
|
||||
// remove the working claim if it's different than the original.
|
||||
for (final StandardRepositoryRecord record : recordsToHandle) {
|
||||
// remove the working claims if they are different than the originals.
|
||||
removeTemporaryClaim(record);
|
||||
}
|
||||
|
||||
final Set<RepositoryRecord> abortedRecords = new HashSet<>();
|
||||
final Set<StandardRepositoryRecord> transferRecords = new HashSet<>();
|
||||
for (final StandardRepositoryRecord record : records.values()) {
|
||||
for (final StandardRepositoryRecord record : recordsToHandle) {
|
||||
if (record.isMarkedForAbort()) {
|
||||
removeContent(record.getWorkingClaim());
|
||||
if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) {
|
||||
|
|
|
@ -829,6 +829,106 @@ public class TestStandardProcessSession {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCommitFailureRequeuesFlowFiles() {
|
||||
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
||||
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
|
||||
|
||||
.contentClaimOffset(0L).size(0L).build();
|
||||
flowFileQueue.put(flowFileRecord);
|
||||
|
||||
final FlowFile originalFlowFile = session.get();
|
||||
assertTrue(flowFileQueue.isActiveQueueEmpty());
|
||||
assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
|
||||
|
||||
final FlowFile modified = session.write(originalFlowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write("Hello".getBytes());
|
||||
}
|
||||
});
|
||||
|
||||
session.transfer(modified);
|
||||
|
||||
// instruct flowfile repo to throw IOException on update
|
||||
flowFileRepo.setFailOnUpdate(true);
|
||||
|
||||
try {
|
||||
session.commit();
|
||||
Assert.fail("Session commit completed, even though FlowFile Repo threw IOException");
|
||||
} catch (final ProcessException pe) {
|
||||
// expected behavior because FlowFile Repo will throw IOException
|
||||
}
|
||||
|
||||
assertFalse(flowFileQueue.isActiveQueueEmpty());
|
||||
assertEquals(1, flowFileQueue.size().getObjectCount());
|
||||
assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRollbackAfterCheckpoint() {
|
||||
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
|
||||
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
|
||||
.entryDate(System.currentTimeMillis())
|
||||
.contentClaim(new StandardContentClaim(new StandardResourceClaim("x", "x", "0", true), 0L))
|
||||
|
||||
.contentClaimOffset(0L).size(0L).build();
|
||||
flowFileQueue.put(flowFileRecord);
|
||||
|
||||
final FlowFile originalFlowFile = session.get();
|
||||
assertTrue(flowFileQueue.isActiveQueueEmpty());
|
||||
assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
|
||||
|
||||
final FlowFile modified = session.write(originalFlowFile, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write("Hello".getBytes());
|
||||
}
|
||||
});
|
||||
|
||||
session.transfer(modified);
|
||||
|
||||
session.checkpoint();
|
||||
assertTrue(flowFileQueue.isActiveQueueEmpty());
|
||||
|
||||
session.rollback();
|
||||
assertTrue(flowFileQueue.isActiveQueueEmpty());
|
||||
assertEquals(0, flowFileQueue.size().getObjectCount());
|
||||
assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
|
||||
|
||||
session.rollback();
|
||||
|
||||
flowFileQueue.put(flowFileRecord);
|
||||
assertFalse(flowFileQueue.isActiveQueueEmpty());
|
||||
|
||||
final FlowFile originalRound2 = session.get();
|
||||
assertTrue(flowFileQueue.isActiveQueueEmpty());
|
||||
assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
|
||||
|
||||
final FlowFile modifiedRound2 = session.write(originalRound2, new OutputStreamCallback() {
|
||||
@Override
|
||||
public void process(OutputStream out) throws IOException {
|
||||
out.write("Hello".getBytes());
|
||||
}
|
||||
});
|
||||
|
||||
session.transfer(modifiedRound2);
|
||||
|
||||
session.checkpoint();
|
||||
assertTrue(flowFileQueue.isActiveQueueEmpty());
|
||||
assertEquals(1, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
|
||||
|
||||
session.commit();
|
||||
|
||||
// FlowFile transferred back to queue
|
||||
assertEquals(1, flowFileQueue.size().getObjectCount());
|
||||
assertEquals(0, flowFileQueue.getUnacknowledgedQueueSize().getObjectCount());
|
||||
assertFalse(flowFileQueue.isActiveQueueEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateEmitted() throws IOException {
|
||||
final FlowFile newFlowFile = session.create();
|
||||
|
@ -910,9 +1010,13 @@ public class TestStandardProcessSession {
|
|||
}
|
||||
|
||||
private static class MockFlowFileRepository implements FlowFileRepository {
|
||||
|
||||
private boolean failOnUpdate = false;
|
||||
private final AtomicLong idGenerator = new AtomicLong(0L);
|
||||
|
||||
public void setFailOnUpdate(final boolean fail) {
|
||||
this.failOnUpdate = fail;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
}
|
||||
|
@ -929,6 +1033,9 @@ public class TestStandardProcessSession {
|
|||
|
||||
@Override
|
||||
public void updateRepository(Collection<RepositoryRecord> records) throws IOException {
|
||||
if (failOnUpdate) {
|
||||
throw new IOException("FlowFile Repository told to fail on update for unit test");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue