mirror of
https://github.com/apache/nifi.git
synced 2025-03-06 09:29:33 +00:00
NIFI-5388: enabled EL support for dynamic properties of Kafka 1.0 processors
This closes #2915 Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
parent
c755ed9322
commit
89186fb96d
@ -74,7 +74,8 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
|
|||||||
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
||||||
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
||||||
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
|
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
|
||||||
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
|
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
|
||||||
|
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
@SeeAlso({ConsumeKafka_1_0.class, PublishKafka_1_0.class, PublishKafkaRecord_1_0.class})
|
@SeeAlso({ConsumeKafka_1_0.class, PublishKafka_1_0.class, PublishKafkaRecord_1_0.class})
|
||||||
public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
|
public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
|
||||||
|
|
||||||
@ -264,7 +265,10 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
|
|||||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
return new PropertyDescriptor.Builder()
|
return new PropertyDescriptor.Builder()
|
||||||
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
|
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
|
||||||
.name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
|
.name(propertyDescriptorName)
|
||||||
|
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
|
||||||
|
.dynamic(true)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,7 +70,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
|
|||||||
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
||||||
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
||||||
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
|
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
|
||||||
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
|
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
|
||||||
|
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
public class ConsumeKafka_1_0 extends AbstractProcessor {
|
public class ConsumeKafka_1_0 extends AbstractProcessor {
|
||||||
|
|
||||||
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
|
static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
|
||||||
@ -253,7 +254,10 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
|
|||||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
return new PropertyDescriptor.Builder()
|
return new PropertyDescriptor.Builder()
|
||||||
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
|
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
|
||||||
.name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
|
.name(propertyDescriptorName)
|
||||||
|
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
|
||||||
|
.dynamic(true)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,7 +73,8 @@ import org.apache.nifi.serialization.record.RecordSet;
|
|||||||
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
||||||
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
||||||
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
|
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
|
||||||
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
|
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
|
||||||
|
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
|
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
|
||||||
+ "FlowFiles that are routed to success.")
|
+ "FlowFiles that are routed to success.")
|
||||||
@SeeAlso({PublishKafka_1_0.class, ConsumeKafka_1_0.class, ConsumeKafkaRecord_1_0.class})
|
@SeeAlso({PublishKafka_1_0.class, ConsumeKafka_1_0.class, ConsumeKafkaRecord_1_0.class})
|
||||||
@ -293,6 +294,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
|
|||||||
.name(propertyDescriptorName)
|
.name(propertyDescriptorName)
|
||||||
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
|
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
|
||||||
.dynamic(true)
|
.dynamic(true)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,7 +68,8 @@ import org.apache.nifi.processor.util.StandardValidators;
|
|||||||
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
|
||||||
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
|
||||||
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
|
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
|
||||||
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
|
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
|
||||||
|
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
|
@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
|
||||||
+ "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
|
+ "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
|
||||||
+ "be greater than 1.")
|
+ "be greater than 1.")
|
||||||
@ -289,6 +290,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
|
|||||||
.name(propertyDescriptorName)
|
.name(propertyDescriptorName)
|
||||||
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
|
.addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
|
||||||
.dynamic(true)
|
.dynamic(true)
|
||||||
|
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user