diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java index 0455fe92ee..12ea1bae67 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/GetAzureEventHub.java @@ -40,6 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; import java.io.IOException; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; @@ -121,6 +122,20 @@ public class GetAzureEventHub extends AbstractProcessor { .expressionLanguageSupported(false) .required(false) .build(); + static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder() + .name("Partition Recivier Fetch Size") + .description("The number of events that a receiver should fetch from an EventHubs partition before returning. Default(100)") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); + static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder() + .name("Partiton Receiver Timeout (millseconds)") + .description("The amount of time a Partition Receiver should wait to receive the Fetch Size before returning. Default(60000)") + .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -130,6 +145,8 @@ public class GetAzureEventHub extends AbstractProcessor { private final ConcurrentMap partitionToReceiverMap = new ConcurrentHashMap<>(); private volatile BlockingQueue partitionNames = new LinkedBlockingQueue<>(); private volatile Instant configuredEnqueueTime; + private volatile int receiverFetchSize; + private volatile Duration receiverFetchTimeout; private EventHubClient eventHubClient; private final static List propertyDescriptors; @@ -148,6 +165,9 @@ public class GetAzureEventHub extends AbstractProcessor { _propertyDescriptors.add(NUM_PARTITIONS); _propertyDescriptors.add(CONSUMER_GROUP); _propertyDescriptors.add(ENQUEUE_TIME); + _propertyDescriptors.add(RECEIVER_FETCH_SIZE); + _propertyDescriptors.add(RECEIVER_FETCH_TIMEOUT); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); Set _relationships = new HashSet<>(); @@ -201,6 +221,7 @@ public class GetAzureEventHub extends AbstractProcessor { partitionId, configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime).get(); + receiver.setReceiveTimeout(receiverFetchTimeout == null ? Duration.ofMillis(60000) : receiverFetchTimeout); partitionToReceiverMap.put(partitionId, receiver); return receiver; @@ -222,7 +243,7 @@ public class GetAzureEventHub extends AbstractProcessor { final PartitionReceiver receiver; try { receiver = getReceiver(context, partitionId); - return receiver.receive(100).get(); + return receiver.receive(receiverFetchSize).get(); } catch (final IOException | ServiceBusException | ExecutionException | InterruptedException e) { throw new ProcessException(e); } @@ -264,6 +285,16 @@ public class GetAzureEventHub extends AbstractProcessor { } else { configuredEnqueueTime = null; } + if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) { + receiverFetchSize = context.getProperty(RECEIVER_FETCH_SIZE).asInteger(); + } else { + receiverFetchSize = 100; + } + if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) { + receiverFetchTimeout = Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong()); + } else { + receiverFetchTimeout = null; + } final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString(); setupReceiver(connectionString); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java index a63458fd5b..951384a965 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/org/apache/nifi/processors/azure/eventhub/GetAzureEventHubTest.java @@ -64,6 +64,10 @@ public class GetAzureEventHubTest { testRunner.assertValid(); testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z"); testRunner.assertValid(); + testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5"); + testRunner.assertValid(); + testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000"); + testRunner.assertValid(); } @Test