NIFI-6853 - flowFileEvent combineCounters hashmap overwritten

- Added unit test to verify behavior of contribution

This closes #3875.

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Iván Rodriguez 2019-11-06 23:33:13 -03:00 committed by Mark Payne
parent 43bc6c6ed9
commit 65ba4a2d93
2 changed files with 38 additions and 3 deletions

View File

@ -564,7 +564,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Map<String, Long> combined = new HashMap<>();
combined.putAll(first);
combined.putAll(second);
second.forEach((key, value) -> combined.merge(key, value, Long::sum));
return combined;
}
@ -3399,12 +3399,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (this.countersOnCommit.isEmpty()) {
this.countersOnCommit.putAll(session.countersOnCommit);
} else {
session.countersOnCommit.forEach((key, value) -> this.countersOnCommit.merge(key, value, (v1, v2) -> v1 + v2));
session.countersOnCommit.forEach((key, value) -> this.countersOnCommit.merge(key, value, Long::sum));
}
}
if (session.immediateCounters != null) {
this.immediateCounters.putAll(session.immediateCounters);
if (this.immediateCounters.isEmpty()) {
this.immediateCounters.putAll(session.immediateCounters);
} else {
session.immediateCounters.forEach((key, value) -> this.immediateCounters.merge(key, value, Long::sum));
}
}
this.deleteOnCommit.putAll(session.deleteOnCommit);

View File

@ -342,6 +342,37 @@ public class TestStandardProcessSession {
assertEquals(2, bCounters);
}
@Test
public void testCombineCounters() {
final Relationship relationship = new Relationship.Builder().name("A").build();
FlowFile flowFile = session.create();
session.transfer(flowFile, relationship);
session.adjustCounter("a", 1, false);
session.adjustCounter("b", 3, false);
session.adjustCounter("a", 3, true);
session.adjustCounter("b", 5, true);
session.checkpoint();
flowFile = session.create();
session.transfer(flowFile, relationship);
session.adjustCounter("a", 1, true);
session.adjustCounter("b", 2, true);
session.commit();
context.getFlowFileEventRepository().reportTransferEvents(10L).getReportEntries().forEach((k, v) -> {
v.getCounters().forEach((key, value) -> {
if (key.equals("a")) {
assertEquals(5L, (long) value);
}
if (key.equals("b")) {
assertEquals(10L, (long) value);
}
});
});
}
@Test
public void testReadCountCorrectWhenSkippingWithReadCallback() throws IOException {
final byte[] content = "This and that and the other.".getBytes(StandardCharsets.UTF_8);