From c958deb5b01bf26c856b5d6a2b93cee87163c10f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 29 Oct 2019 12:23:49 -0400 Subject: [PATCH] NIFI-6822: Ensure that when we manage a Map of ID -> Count, that we properly merge those maps during a checkpoint instead of overwriting existing values --- .../repository/StandardProcessSession.java | 58 +++++++++++++------ .../metrics/StandardFlowFileEvent.java | 28 +++++++++ .../TestStandardProcessSession.java | 48 +++++++++++++-- 3 files changed, 111 insertions(+), 23 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index c930b36a70..da7a6ee0f0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -86,6 +86,8 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -3365,7 +3367,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE private final Map records = new ConcurrentHashMap<>(); private final Map connectionCounts = new ConcurrentHashMap<>(); - private final Map> unacknowledgedFlowFiles = new ConcurrentHashMap<>(); private Map countersOnCommit = new HashMap<>(); private Map immediateCounters = new HashMap<>(); @@ -3392,24 +3393,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE this.reportedEvents.addAll(session.provenanceReporter.getEvents()); this.records.putAll(session.records); - this.connectionCounts.putAll(session.connectionCounts); - this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles); - if (session.countersOnCommit != null) { - if (this.countersOnCommit.isEmpty()) { - this.countersOnCommit.putAll(session.countersOnCommit); - } else { - session.countersOnCommit.forEach((key, value) -> this.countersOnCommit.merge(key, value, Long::sum)); - } - } - - if (session.immediateCounters != null) { - if (this.immediateCounters.isEmpty()) { - this.immediateCounters.putAll(session.immediateCounters); - } else { - session.immediateCounters.forEach((key, value) -> this.immediateCounters.merge(key, value, Long::sum)); - } - } + mergeMapsWithMutableValue(this.connectionCounts, session.connectionCounts, (destination, toMerge) -> destination.add(toMerge)); + mergeMaps(this.countersOnCommit, session.countersOnCommit, Long::sum); + mergeMaps(this.immediateCounters, session.immediateCounters, Long::sum); this.deleteOnCommit.putAll(session.deleteOnCommit); this.removedFlowFiles.addAll(session.removedFlowFiles); @@ -3425,6 +3412,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE this.contentSizeOut += session.contentSizeOut; } + private void mergeMaps(final Map destination, final Map toMerge, final BiFunction merger) { + if (toMerge == null) { + return; + } + + if (destination.isEmpty()) { + destination.putAll(toMerge); + } else { + toMerge.forEach((key, value) -> destination.merge(key, value, merger)); + } + } + + private void mergeMapsWithMutableValue(final Map destination, final Map toMerge, final BiConsumer merger) { + if (toMerge == null) { + return; + } + + if (destination.isEmpty()) { + destination.putAll(toMerge); + return; + } + + for (final Map.Entry entry : toMerge.entrySet()) { + final K key = entry.getKey(); + final V value = entry.getValue(); + + final V destinationValue = destination.get(key); + if (destinationValue == null) { + destination.put(key, value); + } else { + merger.accept(destinationValue, value); + } + } + } + private StandardRepositoryRecord getRecord(final FlowFile flowFile) { return records.get(flowFile.getId()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java index fc00675140..a6e12c9765 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository.metrics; import org.apache.nifi.controller.repository.FlowFileEvent; +import java.util.HashMap; import java.util.Map; public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { @@ -194,4 +195,31 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable { public void setCounters(final Map counters) { this.counters = counters; } + + public void add(final FlowFileEvent event) { + flowFilesIn += event.getFlowFilesIn(); + flowFilesOut += event.getFlowFilesOut(); + flowFilesRemoved += event.getFlowFilesRemoved(); + contentSizeIn += event.getContentSizeIn(); + contentSizeOut += event.getContentSizeOut(); + contentSizeRemoved += event.getContentSizeRemoved(); + bytesRead += event.getBytesRead(); + bytesWritten += event.getBytesWritten(); + processingNanos += event.getProcessingNanoseconds(); + aggregateLineageMillis += event.getAggregateLineageMillis(); + flowFilesReceived += event.getFlowFilesReceived(); + bytesReceived += event.getBytesReceived(); + flowFilesSent += event.getFlowFilesSent(); + bytesSent += event.getBytesSent(); + invocations += event.getInvocations(); + + final Map eventCounters = event.getCounters(); + if (eventCounters != null) { + if (counters == null) { + counters = new HashMap<>(); + } + + eventCounters.forEach((k, v) -> counters.merge(k, v, Long::sum)); + } + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 445a48a30f..5e3b1b6461 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -115,7 +115,7 @@ public class TestStandardProcessSession { private ProvenanceEventRepository provenanceRepo; private MockFlowFileRepository flowFileRepo; private CounterRepository counterRepository; - private FlowFileEventRepository flowFileEventRepo; + private FlowFileEventRepository flowFileEventRepository; private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build(); private static StandardResourceClaimManager resourceClaimManager; @@ -157,7 +157,7 @@ public class TestStandardProcessSession { resourceClaimManager = new StandardResourceClaimManager(); System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile()); - flowFileEventRepo = new RingBufferEventRepository(1); + flowFileEventRepository = new RingBufferEventRepository(1); counterRepository = new StandardCounterRepository(); provenanceRepo = new MockProvenanceRepository(); @@ -198,7 +198,7 @@ public class TestStandardProcessSession { contentRepo.initialize(new StandardResourceClaimManager()); flowFileRepo = new MockFlowFileRepository(contentRepo); - context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepository, provenanceRepo); + context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo); session = new StandardProcessSession(context, () -> false); } @@ -399,7 +399,7 @@ public class TestStandardProcessSession { session.transfer(flowFile); session.commit(); - final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L); + final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L); final long bytesRead = report.getReportEntry("connectable-1").getBytesRead(); assertEquals(1, bytesRead); } @@ -430,11 +430,49 @@ public class TestStandardProcessSession { session.transfer(flowFile); session.commit(); - final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L); + final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L); final long bytesRead = report.getReportEntry("connectable-1").getBytesRead(); assertEquals(1, bytesRead); } + public void testCheckpointMergesMaps() { + for (int i=0; i < 2; i++) { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(i) + .entryDate(System.currentTimeMillis()) + .size(0L) + .build(); + + flowFileQueue.put(flowFileRecord); + } + + final Relationship relationship = new Relationship.Builder().name("A").build(); + + for (int i=0; i < 2; i++) { + FlowFile ff1 = session.get(); + assertNotNull(ff1); + session.transfer(ff1, relationship); + session.adjustCounter("counter", 1, false); + session.adjustCounter("counter", 1, true); + session.checkpoint(); + } + + session.commit(); + + final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L); + final FlowFileEvent queueFlowFileEvent = report.getReportEntry("conn-uuid"); + assertNotNull(queueFlowFileEvent); + assertEquals(2, queueFlowFileEvent.getFlowFilesOut()); + assertEquals(0L, queueFlowFileEvent.getContentSizeOut()); + + final FlowFileEvent componentFlowFileEvent = report.getReportEntry("connectable-1"); + final Map counters = componentFlowFileEvent.getCounters(); + assertNotNull(counters); + assertEquals(1, counters.size()); + assertTrue(counters.containsKey("counter")); + assertEquals(4L, counters.get("counter").longValue()); // increment twice for each FlowFile, once immediate, once not. + } + @Test public void testHandlingOfMultipleFlowFilesWithSameId() { // Add two FlowFiles with the same ID