NIFI-11246 Document differences in GetAzureEventHub and ConsumeAzureEventHub

This closes #7006

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Robert Kalmar 2023-03-03 09:19:14 +01:00 committed by exceptionfactory
parent 169b53feaa
commit c7c1a245a9
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 14 additions and 11 deletions

View File

@ -86,7 +86,9 @@ import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.defaultIfBlank;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from Azure Event Hubs, writing the contents of the message to the content of the FlowFile.")
@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs with checkpointing to ensure consistent event processing. "
+ "Checkpoint tracking avoids consuming a message multiple times and enables reliable resumption of processing in the event of intermittent network failures. "
+ "Checkpoint tracking requires external storage and provides the preferred approach to consuming messages from Azure Event Hubs.")
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@TriggerSerially
@WritesAttributes({

View File

@ -45,6 +45,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
@ -64,8 +65,9 @@ import org.apache.nifi.util.StopWatch;
import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@Tags({"azure", "microsoft", "cloud", "eventhub", "events", "streaming", "streams"})
@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs, writing the contents of the Azure message to the content of the FlowFile. "
+ "Note: Please be aware that this processor creates a thread pool of 4 threads for Event Hub Client. They will be extra threads other than the concurrent tasks scheduled for this processor.")
@CapabilityDescription("Receives messages from Microsoft Azure Event Hubs without reliable checkpoint tracking. "
+ "ConsumeAzureEventHub offers the recommended approach to receiving messages from Azure Event Hubs. "
+ "This processor creates a thread pool for connections to Azure Event Hubs.")
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@WritesAttributes({
@WritesAttribute(attribute = "eventhub.enqueued.timestamp", description = "The time (in milliseconds since epoch, UTC) at which the message was enqueued in the event hub"),
@ -75,8 +77,11 @@ import org.apache.nifi.processors.azure.eventhub.utils.AzureEventHubUtils;
@WritesAttribute(attribute = "eventhub.partition", description = "The name of the event hub partition from which the message was pulled"),
@WritesAttribute(attribute = "eventhub.property.*", description = "The application properties of this message. IE: 'application' would be 'eventhub.property.application'")
})
@SeeAlso(ConsumeAzureEventHub.class)
public class GetAzureEventHub extends AbstractProcessor {
private static final String TRANSIT_URI_FORMAT_STRING = "amqps://%s/%s/ConsumerGroups/%s/Partitions/%s";
private static final Duration DEFAULT_FETCH_TIMEOUT = Duration.ofSeconds(60);
private static final int DEFAULT_FETCH_SIZE = 100;
static final PropertyDescriptor EVENT_HUB_NAME = new PropertyDescriptor.Builder()
.name("Event Hub Name")
@ -91,7 +96,7 @@ public class GetAzureEventHub extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
static final PropertyDescriptor SERVICE_BUS_ENDPOINT =AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor SERVICE_BUS_ENDPOINT = AzureEventHubUtils.SERVICE_BUS_ENDPOINT;
static final PropertyDescriptor ACCESS_POLICY = new PropertyDescriptor.Builder()
.name("Shared Access Policy Name")
.description("The name of the shared access policy. This policy must have Listen claims.")
@ -99,7 +104,7 @@ public class GetAzureEventHub extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
static final PropertyDescriptor POLICY_PRIMARY_KEY = AzureEventHubUtils.POLICY_PRIMARY_KEY;
static final PropertyDescriptor USE_MANAGED_IDENTITY = AzureEventHubUtils.USE_MANAGED_IDENTITY;
@Deprecated
@ -132,16 +137,15 @@ public class GetAzureEventHub extends AbstractProcessor {
static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Partition Recivier Fetch Size")
.displayName("Partition Receiver Fetch Size")
.description("The number of events that a receiver should fetch from an Event Hubs partition before returning. The default is 100")
.description("The number of events that a receiver should fetch from an Event Hubs partition before returning. The default is " + DEFAULT_FETCH_SIZE)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder()
.name("Partiton Receiver Timeout (millseconds)")
.name("Partition Receiver Timeout (millseconds)")
.displayName("Partition Receiver Timeout")
.description("The amount of time in milliseconds a Partition Receiver should wait to receive the Fetch Size before returning. The default is 60000")
.description("The amount of time in milliseconds a Partition Receiver should wait to receive the Fetch Size before returning. The default is " + DEFAULT_FETCH_TIMEOUT.toMillis())
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
@ -172,9 +176,6 @@ public class GetAzureEventHub extends AbstractProcessor {
relationships = Collections.singleton(REL_SUCCESS);
}
private static final Duration DEFAULT_FETCH_TIMEOUT = Duration.ofSeconds(60);
private static final int DEFAULT_FETCH_SIZE = 100;
private final Map<String, EventPosition> partitionEventPositions = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionIds = new LinkedBlockingQueue<>();