diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java index bcf10a4240..afb2cc6fa6 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -119,6 +119,30 @@ class KafkaPublisher implements AutoCloseable { */ BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey, int maxBufferSize) { + List> sendFutures = this.split(messageContext, contentStream, partitionKey, maxBufferSize); + return this.publish(sendFutures); + } + + /** + * This method splits (if required) the incoming content stream into + * messages to publish to Kafka topic. See publish method for more + * details + * + * @param messageContext + * instance of {@link SplittableMessageContext} which hold + * context information about the message to be sent + * @param contentStream + * instance of open {@link InputStream} carrying the content of + * the message(s) to be send to Kafka + * @param partitionKey + * the value of the partition key. Only relevant is user wishes + * to provide a custom partition key instead of relying on + * variety of provided {@link Partitioner}(s) + * @param maxBufferSize maximum message size + * @return The list of messages to publish + */ + List> split(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey, + int maxBufferSize) { List> sendFutures = new ArrayList<>(); BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments(); int segmentCounter = 0; @@ -139,13 +163,13 @@ class KafkaPublisher implements AutoCloseable { segmentCounter++; } } - return this.processAcks(sendFutures); + return sendFutures; } /** * */ - private BitSet processAcks(List> sendFutures) { + BitSet publish(List> sendFutures) { int segmentCounter = 0; BitSet failedSegments = new BitSet(); for (Future future : sendFutures) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 3b5eb4f52a..2cf024589b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -30,10 +30,12 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -54,6 +56,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StopWatch; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @@ -159,9 +162,9 @@ public class PutKafka extends AbstractProcessor { + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, " + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka " + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred " - + "successfully while others are not, the messages will be split into individual FlowFiles, such that those " - + "messages that were successfully sent are routed to the 'success' relationship while other messages are " - + "sent to the 'failure' relationship.") + + "successfully while others are not, the FlowFile will be transferred to the 'failure' relationship. In " + + "case the FlowFile is sent back to this processor, only the messages not previously transferred " + + "successfully will be handled by the processor to be retransferred to Kafka.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -292,19 +295,31 @@ public class PutKafka extends AbstractProcessor { final SplittableMessageContext messageContext = this.buildMessageContext(flowFile, context, session); final Integer partitionKey = this.determinePartition(messageContext, context, flowFile); final AtomicReference failedSegmentsRef = new AtomicReference(); + final List> sendFutures = new ArrayList<>(); + + StopWatch timer = new StopWatch(true); session.read(flowFile, new InputStreamCallback() { @Override public void process(InputStream contentStream) throws IOException { int maxRecordSize = context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).intValue(); - failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey, maxRecordSize)); + sendFutures.addAll(kafkaPublisher.split(messageContext, contentStream, partitionKey, maxRecordSize)); + failedSegmentsRef.set(kafkaPublisher.publish(sendFutures)); } }); + timer.stop(); + final long duration = timer.getDuration(TimeUnit.MILLISECONDS); + final int messagesToSend = sendFutures.size(); + final int messagesSent = messagesToSend - failedSegmentsRef.get().cardinality(); + final String details = messagesSent + " message(s) over " + messagesToSend + " sent successfully"; if (failedSegmentsRef.get().isEmpty()) { - session.getProvenanceReporter().send(flowFile, context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName()); + session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration); flowFile = this.cleanUpFlowFileIfNecessary(flowFile, session); session.transfer(flowFile, REL_SUCCESS); } else { + if(messagesSent != 0) { + session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration); + } flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext)); session.transfer(session.penalize(flowFile), REL_FAILURE); }