diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java index a5c6b15891..ff75453bc0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java @@ -74,7 +74,8 @@ import java.util.regex.Pattern; @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."), @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."), @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"), - @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic the message or message bundle is from") + @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic the message or message bundle is from"), + @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "Set to true if the consumed message is a tombstone message") }) @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 698ecc36a5..b197d97c41 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -474,6 +474,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final byte[] value = record.value(); if (value != null) { flowFile = session.write(flowFile, out -> out.write(value)); + } else { + flowFile = session.putAttribute(flowFile, KafkaFlowFileAttribute.KAFKA_TOMBSTONE, Boolean.TRUE.toString()); } flowFile = session.putAllAttributes(flowFile, getAttributes(record)); tracker.updateFlowFile(flowFile); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java index b6b84ce1e0..9a8e5971ab 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -87,6 +88,8 @@ import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFK + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ", expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT) +@ReadsAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOMBSTONE, description = "If this attribute is set to 'true', if the processor is not configured " + + "with a demarcator and if the FlowFile's content is null, then a tombstone message with zero bytes will be sent to Kafka.") @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to " + "FlowFiles that are routed to success. If the Property is not set, this will always be 1, but if the Property is set, it may " + "be greater than 1.") diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 959e37ad96..2781ea8478 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -50,6 +50,7 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; import org.apache.nifi.kafka.shared.property.PublishStrategy; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -159,9 +160,13 @@ public class PublisherLease implements Closeable { tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes.")); return; } - // Send FlowFile content as it is, to support sending 0 byte message. - messageContent = new byte[(int) flowFile.getSize()]; - StreamUtils.fillBuffer(flowFileContent, messageContent); + if (Boolean.TRUE.toString().equals(flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_TOMBSTONE)) && flowFile.getSize() == 0) { + messageContent = null; + } else { + // Send FlowFile content as it is, to support sending 0 byte message. + messageContent = new byte[(int) flowFile.getSize()]; + StreamUtils.fillBuffer(flowFileContent, messageContent); + } publish(flowFile, messageKey, messageContent, topic, tracker, partition); return; } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java index 991f3b7a9e..15aa16ed76 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/attribute/KafkaFlowFileAttribute.java @@ -39,4 +39,6 @@ public interface KafkaFlowFileAttribute { String KAFKA_CONSUMER_GROUP_ID = "kafka.consumer.id"; String KAFKA_CONSUMER_OFFSETS_COMMITTED = "kafka.consumer.offsets.committed"; + + String KAFKA_TOMBSTONE = "kafka.tombstone"; }