NIFI-4012 Azure Event Hub UI typos and cleanup

This closes #3749.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Shayne Burgess 2019-09-18 13:54:59 -07:00 committed by Koji Kawamura
parent 1dfbc97c07
commit c72a5618c0
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
3 changed files with 52 additions and 48 deletions

View File

@ -78,15 +78,15 @@ import java.util.concurrent.TimeUnit;
import static org.apache.nifi.util.StringUtils.isEmpty;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile.")
@CapabilityDescription("Receives messages from Azure Event Hubs, writing the contents of the message to the content of the FlowFile.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@WritesAttributes({
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"),
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"),
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
@WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"),
@WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")
@WritesAttribute(attribute = "eventhub.sequence", description = "The sequence number associated with the message"),
@WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the partition from which the message was pulled")
})
public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
@ -95,7 +95,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("event-hub-namespace")
.displayName("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns.")
.description("The namespace that the Azure Event Hubs is assigned to. This is generally equal to <Event Hub Names>-ns.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
@ -103,7 +103,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("event-hub-name")
.displayName("Event Hub Name")
.description("The name of the Azure Event Hub to pull messages from.")
.description("The name of the event hub to pull messages from.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
@ -112,7 +112,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor ACCESS_POLICY_NAME = new PropertyDescriptor.Builder()
.name("event-hub-shared-access-policy-name")
.displayName("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.")
.description("The name of the shared access policy. This policy must have Listen claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
@ -120,7 +120,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("event-hub-shared-access-policy-primary-key")
.displayName("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy.")
.description("The primary key of the shared access policy.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.sensitive(true)
@ -128,8 +128,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.build();
static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
.name("event-hub-consumer-group")
.displayName("Event Hub Consumer Group")
.description("The name of the Event Hub Consumer Group to use.")
.displayName("Consumer Group")
.description("The name of the consumer group to use.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("$Default")
@ -137,8 +137,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.build();
static final PropertyDescriptor CONSUMER_HOSTNAME = new PropertyDescriptor.Builder()
.name("event-hub-consumer-hostname")
.displayName("Event Hub Consumer Hostname")
.description("The hostname of this Event Hub Consumer instance." +
.displayName("Consumer Hostname")
.description("The hostname of this event hub consumer instance." +
" If not specified, an unique identifier is generated in 'nifi-<UUID>' format.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@ -149,7 +149,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.name("record-reader")
.displayName("Record Reader")
.description("The Record Reader to use for reading received messages." +
" The Event Hub name can be referred by Expression Language '${eventhub.name}' to access a schema.")
" The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema.")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
@ -158,7 +158,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.name("record-writer")
.displayName("Record Writer")
.description("The Record Writer to use for serializing Records to an output FlowFile." +
" The Event Hub name can be referred by Expression Language '${eventhub.name}' to access a schema." +
" The event hub name can be referred by Expression Language '${eventhub.name}' to access a schema." +
" If not specified, each message will create a FlowFile.")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@ -181,11 +181,11 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor PREFETCH_COUNT = new PropertyDescriptor.Builder()
.name("event-hub-prefetch-count")
.displayName("Prefetch Count")
.defaultValue("The number of messages to fetch from Event Hub before processing." +
.defaultValue("The number of messages to fetch from the event hub before processing." +
" This parameter affects throughput." +
" The more prefetch count, the better throughput in general, but consumes more resources (RAM)." +
" NOTE: Even though Event Hub client API provides this option," +
" actual number of messages can be pre-fetched is depend on the Event Hub server implementation." +
" NOTE: Even though the event hub client API provides this option," +
" actual number of messages can be pre-fetched is depend on the Event Hubs server implementation." +
" It is reported that only one event is received at a time in certain situation." +
" https://github.com/Azure/azure-event-hubs-java/issues/125")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@ -198,9 +198,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
.displayName("Batch Size")
.description("The number of messages to process within a NiFi session." +
" This parameter affects throughput and consistency." +
" NiFi commits its session and Event Hub checkpoint after processing this number of messages." +
" If NiFi session is committed, but failed to create an Event Hub checkpoint," +
" then it is possible that the same messages to be retrieved again." +
" NiFi commits its session and Event Hubs checkpoints after processing this number of messages." +
" If NiFi session is committed, but fails to create an Event Hubs checkpoint," +
" then it is possible that the same messages will be received again." +
" The higher number, the higher throughput, but possibly less consistent.")
.defaultValue("10")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@ -219,7 +219,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor STORAGE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
.name("storage-account-name")
.displayName("Storage Account Name")
.description("Name of the Azure Storage account to store Event Hub Consumer Group state.")
.description("Name of the Azure Storage account to store event hub consumer group state.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(true)
@ -227,7 +227,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor STORAGE_ACCOUNT_KEY = new PropertyDescriptor.Builder()
.name("storage-account-key")
.displayName("Storage Account Key")
.description("The Azure Storage account key to store Event Hub Consumer Group state.")
.description("The Azure Storage account key to store event hub consumer group state.")
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
@ -236,8 +236,8 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor STORAGE_CONTAINER_NAME = new PropertyDescriptor.Builder()
.name("storage-container-name")
.displayName("Storage Container Name")
.description("Name of the Azure Storage Container to store Event Hub Consumer Group state." +
" If not specified, Event Hub name is used.")
.description("Name of the Azure Storage container to store the event hub consumer group state." +
" If not specified, event hub name is used.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.required(false)
@ -250,7 +250,7 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
static final Relationship REL_PARSE_FAILURE = new Relationship.Builder()
.name("parse.failure")
.description("If a message from Event Hub cannot be parsed using the configured Record Reader" +
.description("If a message from event hub cannot be parsed using the configured Record Reader" +
" or failed to be written by the configured Record Writer," +
" the contents of the message will be routed to this Relationship as its own individual FlowFile.")
.build();

View File

@ -65,33 +65,33 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from a Microsoft Azure Event Hub, writing the contents of the Azure message to the content of the FlowFile. "
@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs, writing the contents of the Azure message to the content of the FlowFile. "
+ "Note: Please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the Azure Event Hub"),
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"),
@WritesAttribute(attribute = "eventhub.offset", description = "The offset into the partition at which the message was stored"),
@WritesAttribute(attribute = "eventhub.sequence", description = "The Azure Sequence number associated with the message"),
@WritesAttribute(attribute = "eventhub.name", description = "The name of the Event Hub from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the Azure Partition from which the message was pulled")
@WritesAttribute(attribute = "eventhub.sequence", description = "The Azure sequence number associated with the message"),
@WritesAttribute(attribute = "eventhub.name", description = "The name of the event hub from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the event hub partition from which the message was pulled")
})
public class GetAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the Azure Event Hub to pull messages from")
.description("The name of the event hub to pull messages from")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns")
.description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = new PropertyDescriptor.Builder()
.name("Service Bus Endpoint")
.description("To support Namespaces in non-standard Host URIs ( not .servicebus.windows.net, ie .servicebus.chinacloudapi.cn) select from the drop down acceptable options ")
.description("To support namespaces in non-standard Host URIs ( not .servicebus.windows.net, ie .servicebus.chinacloudapi.cn) select from the drop down acceptable options ")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(".servicebus.windows.net",".servicebus.chinacloudapi.cn")
@ -100,14 +100,14 @@ public class GetAzureEventHub extends AbstractProcessor {
.build();
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Listen permissions.")
.description("The name of the shared access policy. This policy must have Listen claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy")
.description("The primary key of the shared access policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
@ -116,7 +116,7 @@ public class GetAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor NUM_PARTITIONS = new PropertyDescriptor.Builder()
.name("Number of Event Hub Partitions")
.description("The number of partitions that the Event Hub has. Only this number of partitions will be used, "
.description("The number of partitions that the event hub has. Only this number of partitions will be used, "
+ "so it is important to ensure that if the number of partitions changes that this value be updated. Otherwise, some messages may not be consumed.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@ -124,7 +124,8 @@ public class GetAzureEventHub extends AbstractProcessor {
.build();
static final PropertyDescriptor CONSUMER_GROUP = new PropertyDescriptor.Builder()
.name("Event Hub Consumer Group")
.description("The name of the Event Hub Consumer Group to use when pulling events")
.displayName("Consumer Group")
.description("The name of the consumer group to use when pulling events")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("$Default")
@ -133,21 +134,24 @@ public class GetAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor ENQUEUE_TIME = new PropertyDescriptor.Builder()
.name("Event Hub Message Enqueue Time")
.displayName("Message Enqueue Time")
.description("A timestamp (ISO-8601 Instant) formatted as YYYY-MM-DDThhmmss.sssZ (2016-01-01T01:01:01.000Z) from which messages "
+ "should have been enqueued in the EventHub to start reading from")
+ "should have been enqueued in the Event Hub to start reading from")
.addValidator(StandardValidators.ISO8601_INSTANT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Partition Recivier Fetch Size")
.description("The number of events that a receiver should fetch from an EventHubs partition before returning. Default(100)")
.displayName("Partition Receiver Fetch Size")
.description("The number of events that a receiver should fetch from an Event Hubs partition before returning. Default(100)")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder()
.name("Partiton Receiver Timeout (millseconds)")
.name("Partition Receiver Timeout (millseconds)")
.description("The amount of time a Partition Receiver should wait to receive the Fetch Size before returning. Default(60000)")
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@ -156,7 +160,7 @@ public class GetAzureEventHub extends AbstractProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully received from the Azure Event Hub will be transferred to this Relationship.")
.description("Any FlowFile that is successfully received from the event hub will be transferred to this Relationship.")
.build();
private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();

View File

@ -59,34 +59,34 @@ import org.apache.nifi.util.StopWatch;
@SupportsBatching
@Tags({"microsoft", "azure", "cloud", "eventhub", "events", "streams", "streaming"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Sends the contents of a FlowFile to a Windows Azure Event Hub. Note: the content of the FlowFile will be buffered into memory before being sent, "
@CapabilityDescription("Sends the contents of a FlowFile to Windows Azure Event Hubs. Note: the content of the FlowFile will be buffered into memory before being sent, "
+ "so care should be taken to avoid sending FlowFiles to this Processor that exceed the amount of Java Heap Space available. "
+ "Also please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class PutAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name")
.description("The name of the Azure Event Hub to send to")
.description("The name of the event hub to send to")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
static final PropertyDescriptor NAMESPACE = new PropertyDescriptor.Builder()
.name("Event Hub Namespace")
.description("The Azure Namespace that the Event Hub is assigned to. This is generally equal to <Event Hub Name>-ns")
.description("The namespace that the event hub is assigned to. This is generally equal to <Event Hubs Name>-ns")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the Event Hub Shared Access Policy. This Policy must have Send permissions.")
.description("The name of the shared access policy. This policy must have Send claims.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Primary Key")
.description("The primary key of the Event Hub Shared Access Policy")
.description("The primary key of the shared access policy")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.sensitive(true)
@ -95,11 +95,11 @@ public class PutAzureEventHub extends AbstractProcessor {
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully sent to the Azure Event Hub will be transferred to this Relationship.")
.description("Any FlowFile that is successfully sent to the event hubs will be transferred to this Relationship.")
.build();
static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that could not be sent to the Azure Event Hub will be transferred to this Relationship.")
.description("Any FlowFile that could not be sent to the event hub will be transferred to this Relationship.")
.build();
private volatile BlockingQueue<EventHubClient> senderQueue = new LinkedBlockingQueue<>();