From d543cfde2552b6ac5f66d3c723afe60142f3d625 Mon Sep 17 00:00:00 2001 From: jknulst Date: Wed, 6 Dec 2017 21:31:57 +0100 Subject: [PATCH] NIFI-4675 Lifted restriction on demarcator and kafka.key usage together. This closes #2326. --- .../nifi/processors/kafka/pubsub/PublishKafka_0_10.java | 9 ++++----- .../nifi/processors/kafka/pubsub/PublisherLease.java | 5 +---- .../nifi/processors/kafka/pubsub/PublishKafka_0_11.java | 9 ++++----- .../nifi/processors/kafka/pubsub/PublisherLease.java | 5 +---- .../nifi/processors/kafka/pubsub/PublishKafka.java | 9 ++++----- .../nifi/processors/kafka/pubsub/PublisherLease.java | 5 +---- .../nifi/processors/kafka/pubsub/PublishKafka_1_0.java | 9 ++++----- .../nifi/processors/kafka/pubsub/PublisherLease.java | 5 +---- 8 files changed, 20 insertions(+), 36 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java index 1c7777f2eb..c9d0f37a00 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java @@ -148,8 +148,10 @@ public class PublishKafka_0_10 extends AbstractProcessor { .name("kafka-key") .displayName("Kafka Key") .description("The Key to use for the Message. " - + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present " - + "and we're not demarcating.") + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -370,9 +372,6 @@ public class PublishKafka_0_10 extends AbstractProcessor { private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) { - if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { - return null; - } final String uninterpretedKey; if (context.getProperty(KEY).isSet()) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 5a05a0887c..c17c33146d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -75,10 +75,7 @@ public class PublisherLease implements Closeable { byte[] messageContent; try { while ((messageContent = demarcator.nextToken()) != null) { - // We do not want to use any key if we have a demarcator because that would result in - // the key being the same for multiple messages - final byte[] keyToUse = demarcatorBytes == null ? messageKey : null; - publish(flowFile, keyToUse, messageContent, topic, tracker); + publish(flowFile, messageKey, messageContent, topic, tracker); if (tracker.isFailed(flowFile)) { // If we have a failure, don't try to send anything else. diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java index ab6b84ce1e..130954a94b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java @@ -150,8 +150,10 @@ public class PublishKafka_0_11 extends AbstractProcessor { .name("kafka-key") .displayName("Kafka Key") .description("The Key to use for the Message. " - + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present " - + "and we're not demarcating.") + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -433,9 +435,6 @@ public class PublishKafka_0_11 extends AbstractProcessor { private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) { - if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { - return null; - } final String uninterpretedKey; if (context.getProperty(KEY).isSet()) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index d18df7f5bc..e8d744aa2d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -117,10 +117,7 @@ public class PublisherLease implements Closeable { byte[] messageContent; try { while ((messageContent = demarcator.nextToken()) != null) { - // We do not want to use any key if we have a demarcator because that would result in - // the key being the same for multiple messages - final byte[] keyToUse = demarcatorBytes == null ? messageKey : null; - publish(flowFile, keyToUse, messageContent, topic, tracker); + publish(flowFile, messageKey, messageContent, topic, tracker); if (tracker.isFailed(flowFile)) { // If we have a failure, don't try to send anything else. diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index af171bb38d..32d1ea7bb0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -150,8 +150,10 @@ public class PublishKafka extends AbstractProcessor { .name("kafka-key") .displayName("Kafka Key") .description("The Key to use for the Message. " - + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present " - + "and we're not demarcating.") + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -372,9 +374,6 @@ public class PublishKafka extends AbstractProcessor { private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) { - if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { - return null; - } final String uninterpretedKey; if (context.getProperty(KEY).isSet()) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index b67e8a8614..8fb4e67fa1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -65,10 +65,7 @@ public class PublisherLease implements Closeable { byte[] messageContent; try { while ((messageContent = demarcator.nextToken()) != null) { - // We do not want to use any key if we have a demarcator because that would result in - // the key being the same for multiple messages - final byte[] keyToUse = demarcatorBytes == null ? messageKey : null; - publish(flowFile, keyToUse, messageContent, topic, tracker); + publish(flowFile, messageKey, messageContent, topic, tracker); if (tracker.isFailed(flowFile)) { // If we have a failure, don't try to send anything else. diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java index 6f17bd5fa5..48f7747a80 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java @@ -150,8 +150,10 @@ public class PublishKafka_1_0 extends AbstractProcessor { .name("kafka-key") .displayName("Kafka Key") .description("The Key to use for the Message. " - + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present " - + "and we're not demarcating.") + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present." + + "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key." + + "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of " + + "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -433,9 +435,6 @@ public class PublishKafka_1_0 extends AbstractProcessor { private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) { - if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) { - return null; - } final String uninterpretedKey; if (context.getProperty(KEY).isSet()) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index d18df7f5bc..e8d744aa2d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -117,10 +117,7 @@ public class PublisherLease implements Closeable { byte[] messageContent; try { while ((messageContent = demarcator.nextToken()) != null) { - // We do not want to use any key if we have a demarcator because that would result in - // the key being the same for multiple messages - final byte[] keyToUse = demarcatorBytes == null ? messageKey : null; - publish(flowFile, keyToUse, messageContent, topic, tracker); + publish(flowFile, messageKey, messageContent, topic, tracker); if (tracker.isFailed(flowFile)) { // If we have a failure, don't try to send anything else.