NIFI-4675 Lifted restriction on demarcator and kafka.key usage together. This closes #2326.

This commit is contained in:
jknulst 2017-12-06 21:31:57 +01:00 committed by Mark Payne
parent 48ae4be015
commit d543cfde25
8 changed files with 20 additions and 36 deletions

View File

@ -148,8 +148,10 @@ public class PublishKafka_0_10 extends AbstractProcessor {
.name("kafka-key")
.displayName("Kafka Key")
.description("The Key to use for the Message. "
+ "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+ "and we're not demarcating.")
+ "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present."
+ "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key."
+ "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of "
+ "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
@ -370,9 +372,6 @@ public class PublishKafka_0_10 extends AbstractProcessor {
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
return null;
}
final String uninterpretedKey;
if (context.getProperty(KEY).isSet()) {

View File

@ -75,10 +75,7 @@ public class PublisherLease implements Closeable {
byte[] messageContent;
try {
while ((messageContent = demarcator.nextToken()) != null) {
// We do not want to use any key if we have a demarcator because that would result in
// the key being the same for multiple messages
final byte[] keyToUse = demarcatorBytes == null ? messageKey : null;
publish(flowFile, keyToUse, messageContent, topic, tracker);
publish(flowFile, messageKey, messageContent, topic, tracker);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.

View File

@ -150,8 +150,10 @@ public class PublishKafka_0_11 extends AbstractProcessor {
.name("kafka-key")
.displayName("Kafka Key")
.description("The Key to use for the Message. "
+ "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+ "and we're not demarcating.")
+ "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present."
+ "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key."
+ "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of "
+ "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
@ -433,9 +435,6 @@ public class PublishKafka_0_11 extends AbstractProcessor {
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
return null;
}
final String uninterpretedKey;
if (context.getProperty(KEY).isSet()) {

View File

@ -117,10 +117,7 @@ public class PublisherLease implements Closeable {
byte[] messageContent;
try {
while ((messageContent = demarcator.nextToken()) != null) {
// We do not want to use any key if we have a demarcator because that would result in
// the key being the same for multiple messages
final byte[] keyToUse = demarcatorBytes == null ? messageKey : null;
publish(flowFile, keyToUse, messageContent, topic, tracker);
publish(flowFile, messageKey, messageContent, topic, tracker);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.

View File

@ -150,8 +150,10 @@ public class PublishKafka extends AbstractProcessor {
.name("kafka-key")
.displayName("Kafka Key")
.description("The Key to use for the Message. "
+ "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+ "and we're not demarcating.")
+ "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present."
+ "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key."
+ "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of "
+ "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
@ -372,9 +374,6 @@ public class PublishKafka extends AbstractProcessor {
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
return null;
}
final String uninterpretedKey;
if (context.getProperty(KEY).isSet()) {

View File

@ -65,10 +65,7 @@ public class PublisherLease implements Closeable {
byte[] messageContent;
try {
while ((messageContent = demarcator.nextToken()) != null) {
// We do not want to use any key if we have a demarcator because that would result in
// the key being the same for multiple messages
final byte[] keyToUse = demarcatorBytes == null ? messageKey : null;
publish(flowFile, keyToUse, messageContent, topic, tracker);
publish(flowFile, messageKey, messageContent, topic, tracker);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.

View File

@ -150,8 +150,10 @@ public class PublishKafka_1_0 extends AbstractProcessor {
.name("kafka-key")
.displayName("Kafka Key")
.description("The Key to use for the Message. "
+ "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+ "and we're not demarcating.")
+ "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present."
+ "Beware that setting Kafka key and demarcating at the same time may potentially lead to many Kafka messages with the same key."
+ "Normally this is not a problem as Kafka does not enforce or assume message and key uniqueness. Still, setting the demarcator and Kafka key at the same time poses a risk of "
+ "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
@ -433,9 +435,6 @@ public class PublishKafka_1_0 extends AbstractProcessor {
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
return null;
}
final String uninterpretedKey;
if (context.getProperty(KEY).isSet()) {

View File

@ -117,10 +117,7 @@ public class PublisherLease implements Closeable {
byte[] messageContent;
try {
while ((messageContent = demarcator.nextToken()) != null) {
// We do not want to use any key if we have a demarcator because that would result in
// the key being the same for multiple messages
final byte[] keyToUse = demarcatorBytes == null ? messageKey : null;
publish(flowFile, keyToUse, messageContent, topic, tracker);
publish(flowFile, messageKey, messageContent, topic, tracker);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.