NIFI-12371 Support tombstone messages in non-record Kafka processors

This closes #8076

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Pierre Villard 2023-11-10 20:08:59 +01:00 committed by exceptionfactory
parent 4f399c9bb9
commit ee2368e0ae
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 17 additions and 4 deletions

View File

@ -74,7 +74,8 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "Set to true if the consumed message is a tombstone message")
})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",

View File

@ -474,6 +474,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> out.write(value));
} else {
flowFile = session.putAttribute(flowFile, KafkaFlowFileAttribute.KAFKA_TOMBSTONE, Boolean.TRUE.toString());
}
flowFile = session.putAllAttributes(flowFile, getAttributes(record));
tracker.updateFlowFile(flowFile);

View File

@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -87,6 +88,8 @@ import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFK
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "If this attribute is set to 'true', if the processor is not configured "
+ "with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.")
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
+ "be greater than 1.")

View File

@ -50,6 +50,7 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -159,9 +160,13 @@ public class PublisherLease implements Closeable {
tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
return;
}
// Send FlowFile content as it is, to support sending 0 byte message.
messageContent = new byte[(int) flowFile.getSize()];
StreamUtils.fillBuffer(flowFileContent, messageContent);
if (Boolean.TRUE.toString().equals(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_TOMBSTONE)) && flowFile.getSize() == 0) {
messageContent = null;
} else {
// Send FlowFile content as it is, to support sending 0 byte message.
messageContent = new byte[(int) flowFile.getSize()];
StreamUtils.fillBuffer(flowFileContent, messageContent);
}
publish(flowFile, messageKey, messageContent, topic, tracker, partition);
return;
}

View File

@ -39,4 +39,6 @@ public interface KafkaFlowFileAttribute {
String KAFKA_CONSUMER_GROUP_ID = "kafka.consumer.id";
String KAFKA_CONSUMER_OFFSETS_COMMITTED = "kafka.consumer.offsets.committed";
String KAFKA_TOMBSTONE = "kafka.tombstone";
}