NIFI-3091: Ensure that we set the appropriate size on FlowFiles when modifying them

This closes #1267

Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Mark Payne 2016-11-23 11:09:42 -05:00 committed by jpercivall
parent 91ff810dba
commit 7ff14f7191
2 changed files with 61 additions and 1 deletions

View File

@ -2580,7 +2580,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
cnfeThrown = true;
throw cnfe;
} finally {
this.bytesWritten += countingOut.getBytesWritten();
writtenToFlowFile = countingOut.getBytesWritten();
this.bytesWritten += writtenToFlowFile;
this.bytesRead += countingIn.getBytesRead();
recursionSet.remove(source);

View File

@ -276,6 +276,65 @@ public class TestStandardProcessSession {
assertEquals(0, numClaims);
}
@Test
public void testModifyContentWithStreamCallbackHasCorrectSize() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile original = session.get();
assertNotNull(original);
FlowFile child = session.write(original, (in, out) -> out.write("hello".getBytes()));
session.transfer(child);
session.commit();
final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
assertEquals(5, onQueue.getSize());
}
@Test
public void testModifyContentWithOutputStreamCallbackHasCorrectSize() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile original = session.get();
assertNotNull(original);
FlowFile child = session.write(original, out -> out.write("hello".getBytes()));
session.transfer(child);
session.commit();
final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
assertEquals(5, onQueue.getSize());
}
@Test
public void testModifyContentWithAppendHasCorrectSize() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(1000L)
.addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
.entryDate(System.currentTimeMillis())
.build();
flowFileQueue.put(flowFileRecord);
FlowFile original = session.get();
assertNotNull(original);
FlowFile child = session.append(original, out -> out.write("hello".getBytes()));
session.transfer(child);
session.commit();
final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet());
assertEquals(5, onQueue.getSize());
}
@Test
public void testModifyContentThenRollback() throws IOException {
assertEquals(0, contentRepo.getExistingClaims().size());