Merge pull request #3853 from markap14/NIFI-6822

NIFI-6822: Ensure that when we manage a Map of ID -> Count, that we p…
This commit is contained in:
markap14 2020-01-06 15:01:42 -05:00 committed by GitHub
commit e4bdc79ea6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 111 additions and 23 deletions

View File

@ -86,6 +86,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -3365,7 +3367,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<Long, StandardRepositoryRecord> records = new ConcurrentHashMap<>();
private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>();
private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>();
private Map<String, Long> countersOnCommit = new HashMap<>();
private Map<String, Long> immediateCounters = new HashMap<>();
@ -3392,24 +3393,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.reportedEvents.addAll(session.provenanceReporter.getEvents());
this.records.putAll(session.records);
this.connectionCounts.putAll(session.connectionCounts);
this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
if (session.countersOnCommit != null) {
if (this.countersOnCommit.isEmpty()) {
this.countersOnCommit.putAll(session.countersOnCommit);
} else {
session.countersOnCommit.forEach((key, value) -> this.countersOnCommit.merge(key, value, Long::sum));
}
}
if (session.immediateCounters != null) {
if (this.immediateCounters.isEmpty()) {
this.immediateCounters.putAll(session.immediateCounters);
} else {
session.immediateCounters.forEach((key, value) -> this.immediateCounters.merge(key, value, Long::sum));
}
}
mergeMapsWithMutableValue(this.connectionCounts, session.connectionCounts, (destination, toMerge) -> destination.add(toMerge));
mergeMaps(this.countersOnCommit, session.countersOnCommit, Long::sum);
mergeMaps(this.immediateCounters, session.immediateCounters, Long::sum);
this.deleteOnCommit.putAll(session.deleteOnCommit);
this.removedFlowFiles.addAll(session.removedFlowFiles);
@ -3425,6 +3412,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.contentSizeOut += session.contentSizeOut;
}
private <K, V> void mergeMaps(final Map<K, V> destination, final Map<K, V> toMerge, final BiFunction<? super V, ? super V, ? extends V> merger) {
if (toMerge == null) {
return;
}
if (destination.isEmpty()) {
destination.putAll(toMerge);
} else {
toMerge.forEach((key, value) -> destination.merge(key, value, merger));
}
}
private <K, V> void mergeMapsWithMutableValue(final Map<K, V> destination, final Map<K, V> toMerge, final BiConsumer<? super V, ? super V> merger) {
if (toMerge == null) {
return;
}
if (destination.isEmpty()) {
destination.putAll(toMerge);
return;
}
for (final Map.Entry<K, V> entry : toMerge.entrySet()) {
final K key = entry.getKey();
final V value = entry.getValue();
final V destinationValue = destination.get(key);
if (destinationValue == null) {
destination.put(key, value);
} else {
merger.accept(destinationValue, value);
}
}
}
private StandardRepositoryRecord getRecord(final FlowFile flowFile) {
return records.get(flowFile.getId());
}

View File

@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
import java.util.HashMap;
import java.util.Map;
public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
@ -194,4 +195,31 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
public void setCounters(final Map<String, Long> counters) {
this.counters = counters;
}
public void add(final FlowFileEvent event) {
flowFilesIn += event.getFlowFilesIn();
flowFilesOut += event.getFlowFilesOut();
flowFilesRemoved += event.getFlowFilesRemoved();
contentSizeIn += event.getContentSizeIn();
contentSizeOut += event.getContentSizeOut();
contentSizeRemoved += event.getContentSizeRemoved();
bytesRead += event.getBytesRead();
bytesWritten += event.getBytesWritten();
processingNanos += event.getProcessingNanoseconds();
aggregateLineageMillis += event.getAggregateLineageMillis();
flowFilesReceived += event.getFlowFilesReceived();
bytesReceived += event.getBytesReceived();
flowFilesSent += event.getFlowFilesSent();
bytesSent += event.getBytesSent();
invocations += event.getInvocations();
final Map<String, Long> eventCounters = event.getCounters();
if (eventCounters != null) {
if (counters == null) {
counters = new HashMap<>();
}
eventCounters.forEach((k, v) -> counters.merge(k, v, Long::sum));
}
}
}

View File

@ -115,7 +115,7 @@ public class TestStandardProcessSession {
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
private CounterRepository counterRepository;
private FlowFileEventRepository flowFileEventRepo;
private FlowFileEventRepository flowFileEventRepository;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private static StandardResourceClaimManager resourceClaimManager;
@ -157,7 +157,7 @@ public class TestStandardProcessSession {
resourceClaimManager = new StandardResourceClaimManager();
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile());
flowFileEventRepo = new RingBufferEventRepository(1);
flowFileEventRepository = new RingBufferEventRepository(1);
counterRepository = new StandardCounterRepository();
provenanceRepo = new MockProvenanceRepository();
@ -198,7 +198,7 @@ public class TestStandardProcessSession {
contentRepo.initialize(new StandardResourceClaimManager());
flowFileRepo = new MockFlowFileRepository(contentRepo);
context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepository, provenanceRepo);
context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo);
session = new StandardProcessSession(context, () -> false);
}
@ -399,7 +399,7 @@ public class TestStandardProcessSession {
session.transfer(flowFile);
session.commit();
final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
assertEquals(1, bytesRead);
}
@ -430,11 +430,49 @@ public class TestStandardProcessSession {
session.transfer(flowFile);
session.commit();
final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
assertEquals(1, bytesRead);
}
public void testCheckpointMergesMaps() {
for (int i=0; i < 2; i++) {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(i)
.entryDate(System.currentTimeMillis())
.size(0L)
.build();
flowFileQueue.put(flowFileRecord);
}
final Relationship relationship = new Relationship.Builder().name("A").build();
for (int i=0; i < 2; i++) {
FlowFile ff1 = session.get();
assertNotNull(ff1);
session.transfer(ff1, relationship);
session.adjustCounter("counter", 1, false);
session.adjustCounter("counter", 1, true);
session.checkpoint();
}
session.commit();
final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
final FlowFileEvent queueFlowFileEvent = report.getReportEntry("conn-uuid");
assertNotNull(queueFlowFileEvent);
assertEquals(2, queueFlowFileEvent.getFlowFilesOut());
assertEquals(0L, queueFlowFileEvent.getContentSizeOut());
final FlowFileEvent componentFlowFileEvent = report.getReportEntry("connectable-1");
final Map<String, Long> counters = componentFlowFileEvent.getCounters();
assertNotNull(counters);
assertEquals(1, counters.size());
assertTrue(counters.containsKey("counter"));
assertEquals(4L, counters.get("counter").longValue()); // increment twice for each FlowFile, once immediate, once not.
}
@Test
public void testHandlingOfMultipleFlowFilesWithSameId() {
// Add two FlowFiles with the same ID