NIFI-10199: PublishKafka InFlightMessageTracker should not log batch level errors on each record

Currently, when a batch level error occurs (e.g. delivery timeout), the same error is logged for each record of the batch, needlessly flooding the logs.
InFlightMessageTracker now only logs an exception when a flow file encounters it for the first time.

This closes #6185
Signed-off-by: Paul Grey <greyp@apache.org>
This commit is contained in:
Daniel Urban 2022-07-08 10:39:56 +02:00 committed by Paul Grey
parent 70b73f63fd
commit 387ae7d323
No known key found for this signature in database
GPG Key ID: 8DDF32B9C7EE39D0
3 changed files with 27 additions and 3 deletions

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
public class InFlightMessageTracker {
private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures = new ConcurrentHashMap<>();
private final Object progressMutex = new Object();
private final ComponentLog logger;
@ -70,7 +72,12 @@ public class InFlightMessageTracker {
public void fail(final FlowFile flowFile, final Exception exception) {
failures.putIfAbsent(flowFile, exception);
logger.error("Failed to send " + flowFile + " to Kafka", exception);
boolean newException = encounteredFailures
.computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
.add(exception);
if (newException) {
logger.error("Failed to send {} to Kafka", flowFile, exception);
}
synchronized (progressMutex) {
progressMutex.notify();
@ -88,6 +95,7 @@ public class InFlightMessageTracker {
public void reset() {
messageCountsByFlowFile.clear();
failures.clear();
encounteredFailures.clear();
}
public PublishResult failOutstanding(final Exception exception) {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
public class InFlightMessageTracker {
private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures = new ConcurrentHashMap<>();
private final Object progressMutex = new Object();
private final ComponentLog logger;
@ -70,7 +72,12 @@ public class InFlightMessageTracker {
public void fail(final FlowFile flowFile, final Exception exception) {
failures.putIfAbsent(flowFile, exception);
logger.error("Failed to send " + flowFile + " to Kafka", exception);
boolean newException = encounteredFailures
.computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
.add(exception);
if (newException) {
logger.error("Failed to send {} to Kafka", flowFile, exception);
}
synchronized (progressMutex) {
progressMutex.notify();
@ -88,6 +95,7 @@ public class InFlightMessageTracker {
public void reset() {
messageCountsByFlowFile.clear();
failures.clear();
encounteredFailures.clear();
}
public PublishResult failOutstanding(final Exception exception) {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
@ -29,6 +30,7 @@ import org.apache.nifi.logging.ComponentLog;
public class InFlightMessageTracker {
private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
private final ConcurrentMap<FlowFile, Set<Exception>> encounteredFailures = new ConcurrentHashMap<>();
private final Object progressMutex = new Object();
private final ComponentLog logger;
@ -70,7 +72,12 @@ public class InFlightMessageTracker {
public void fail(final FlowFile flowFile, final Exception exception) {
failures.putIfAbsent(flowFile, exception);
logger.error("Failed to send {} to Kafka", flowFile, exception);
boolean newException = encounteredFailures
.computeIfAbsent(flowFile, (k) -> ConcurrentHashMap.newKeySet())
.add(exception);
if (newException) {
logger.error("Failed to send {} to Kafka", flowFile, exception);
}
synchronized (progressMutex) {
progressMutex.notify();
@ -88,6 +95,7 @@ public class InFlightMessageTracker {
public void reset() {
messageCountsByFlowFile.clear();
failures.clear();
encounteredFailures.clear();
}
public PublishResult failOutstanding(final Exception exception) {