NIFI-1672 Improved the Provenance Events emitted by PutKafka

This closes #355
This commit is contained in:
Pierre Villard 2016-04-15 15:25:28 +02:00 committed by Oleg Zhurakousky
parent dd8c26e35c
commit 3d6e664097
2 changed files with 46 additions and 7 deletions

View File

@ -119,6 +119,30 @@ class KafkaPublisher implements AutoCloseable {
*/
BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey,
int maxBufferSize) {
List<Future<RecordMetadata>> sendFutures = this.split(messageContext, contentStream, partitionKey, maxBufferSize);
return this.publish(sendFutures);
}
/**
* This method splits (if required) the incoming content stream into
* messages to publish to Kafka topic. See publish method for more
* details
*
* @param messageContext
* instance of {@link SplittableMessageContext} which hold
* context information about the message to be sent
* @param contentStream
* instance of open {@link InputStream} carrying the content of
* the message(s) to be send to Kafka
* @param partitionKey
* the value of the partition key. Only relevant is user wishes
* to provide a custom partition key instead of relying on
* variety of provided {@link Partitioner}(s)
* @param maxBufferSize maximum message size
* @return The list of messages to publish
*/
List<Future<RecordMetadata>> split(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey,
int maxBufferSize) {
List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments();
int segmentCounter = 0;
@ -139,13 +163,13 @@ class KafkaPublisher implements AutoCloseable {
segmentCounter++;
}
}
return this.processAcks(sendFutures);
return sendFutures;
}
/**
*
*/
private BitSet processAcks(List<Future<RecordMetadata>> sendFutures) {
BitSet publish(List<Future<RecordMetadata>> sendFutures) {
int segmentCounter = 0;
BitSet failedSegments = new BitSet();
for (Future<RecordMetadata> future : sendFutures) {

View File

@ -30,10 +30,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -54,6 +56,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
@ -159,9 +162,9 @@ public class PutKafka extends AbstractProcessor {
+ "If not specified, the entire content of the FlowFile will be used as a single message. If specified, "
+ "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka "
+ "message. Note that if messages are delimited and some messages for a given FlowFile are transferred "
+ "successfully while others are not, the messages will be split into individual FlowFiles, such that those "
+ "messages that were successfully sent are routed to the 'success' relationship while other messages are "
+ "sent to the 'failure' relationship.")
+ "successfully while others are not, the FlowFile will be transferred to the 'failure' relationship. In "
+ "case the FlowFile is sent back to this processor, only the messages not previously transferred "
+ "successfully will be handled by the processor to be retransferred to Kafka.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
@ -292,19 +295,31 @@ public class PutKafka extends AbstractProcessor {
final SplittableMessageContext messageContext = this.buildMessageContext(flowFile, context, session);
final Integer partitionKey = this.determinePartition(messageContext, context, flowFile);
final AtomicReference<BitSet> failedSegmentsRef = new AtomicReference<BitSet>();
final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream contentStream) throws IOException {
int maxRecordSize = context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).intValue();
failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey, maxRecordSize));
sendFutures.addAll(kafkaPublisher.split(messageContext, contentStream, partitionKey, maxRecordSize));
failedSegmentsRef.set(kafkaPublisher.publish(sendFutures));
}
});
timer.stop();
final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
final int messagesToSend = sendFutures.size();
final int messagesSent = messagesToSend - failedSegmentsRef.get().cardinality();
final String details = messagesSent + " message(s) over " + messagesToSend + " sent successfully";
if (failedSegmentsRef.get().isEmpty()) {
session.getProvenanceReporter().send(flowFile, context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName());
session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration);
flowFile = this.cleanUpFlowFileIfNecessary(flowFile, session);
session.transfer(flowFile, REL_SUCCESS);
} else {
if(messagesSent != 0) {
session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration);
}
flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext));
session.transfer(session.penalize(flowFile), REL_FAILURE);
}