NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 Better Configs for Fetch and Timeout

NIFI-2834 ofSec to of Mils

This closes #1167
This commit is contained in:
Joe N 2016-10-29 11:48:20 -04:00 committed by Oleg Zhurakousky
parent 769530beae
commit b5550ffcf5
2 changed files with 36 additions and 1 deletions

View File

@ -40,6 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import java.io.IOException; import java.io.IOException;
import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -121,6 +122,20 @@ public class GetAzureEventHub extends AbstractProcessor {
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.required(false) .required(false)
.build(); .build();
static final PropertyDescriptor RECEIVER_FETCH_SIZE = new PropertyDescriptor.Builder()
.name("Partition Recivier Fetch Size")
.description("The number of events that a receiver should fetch from an EventHubs partition before returning. Default(100)")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.required(false)
.build();
static final PropertyDescriptor RECEIVER_FETCH_TIMEOUT = new PropertyDescriptor.Builder()
.name("Partiton Receiver Timeout (millseconds)")
.description("The amount of time a Partition Receiver should wait to receive the Fetch Size before returning. Default(60000)")
.addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
.expressionLanguageSupported(false)
.required(false)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder() static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -130,6 +145,8 @@ public class GetAzureEventHub extends AbstractProcessor {
private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>(); private final ConcurrentMap<String, PartitionReceiver> partitionToReceiverMap = new ConcurrentHashMap<>();
private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>(); private volatile BlockingQueue<String> partitionNames = new LinkedBlockingQueue<>();
private volatile Instant configuredEnqueueTime; private volatile Instant configuredEnqueueTime;
private volatile int receiverFetchSize;
private volatile Duration receiverFetchTimeout;
private EventHubClient eventHubClient; private EventHubClient eventHubClient;
private final static List<PropertyDescriptor> propertyDescriptors; private final static List<PropertyDescriptor> propertyDescriptors;
@ -148,6 +165,9 @@ public class GetAzureEventHub extends AbstractProcessor {
_propertyDescriptors.add(NUM_PARTITIONS); _propertyDescriptors.add(NUM_PARTITIONS);
_propertyDescriptors.add(CONSUMER_GROUP); _propertyDescriptors.add(CONSUMER_GROUP);
_propertyDescriptors.add(ENQUEUE_TIME); _propertyDescriptors.add(ENQUEUE_TIME);
_propertyDescriptors.add(RECEIVER_FETCH_SIZE);
_propertyDescriptors.add(RECEIVER_FETCH_TIMEOUT);
propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
Set<Relationship> _relationships = new HashSet<>(); Set<Relationship> _relationships = new HashSet<>();
@ -201,6 +221,7 @@ public class GetAzureEventHub extends AbstractProcessor {
partitionId, partitionId,
configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime).get(); configuredEnqueueTime == null ? Instant.now() : configuredEnqueueTime).get();
receiver.setReceiveTimeout(receiverFetchTimeout == null ? Duration.ofMillis(60000) : receiverFetchTimeout);
partitionToReceiverMap.put(partitionId, receiver); partitionToReceiverMap.put(partitionId, receiver);
return receiver; return receiver;
@ -222,7 +243,7 @@ public class GetAzureEventHub extends AbstractProcessor {
final PartitionReceiver receiver; final PartitionReceiver receiver;
try { try {
receiver = getReceiver(context, partitionId); receiver = getReceiver(context, partitionId);
return receiver.receive(100).get(); return receiver.receive(receiverFetchSize).get();
} catch (final IOException | ServiceBusException | ExecutionException | InterruptedException e) { } catch (final IOException | ServiceBusException | ExecutionException | InterruptedException e) {
throw new ProcessException(e); throw new ProcessException(e);
} }
@ -264,6 +285,16 @@ public class GetAzureEventHub extends AbstractProcessor {
} else { } else {
configuredEnqueueTime = null; configuredEnqueueTime = null;
} }
if(context.getProperty(RECEIVER_FETCH_SIZE).isSet()) {
receiverFetchSize = context.getProperty(RECEIVER_FETCH_SIZE).asInteger();
} else {
receiverFetchSize = 100;
}
if(context.getProperty(RECEIVER_FETCH_TIMEOUT).isSet()) {
receiverFetchTimeout = Duration.ofMillis(context.getProperty(RECEIVER_FETCH_TIMEOUT).asLong());
} else {
receiverFetchTimeout = null;
}
final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString(); final String connectionString = new ConnectionStringBuilder(namespace, eventHubName, policyName, policyKey).toString();
setupReceiver(connectionString); setupReceiver(connectionString);

View File

@ -64,6 +64,10 @@ public class GetAzureEventHubTest {
testRunner.assertValid(); testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z"); testRunner.setProperty(GetAzureEventHub.ENQUEUE_TIME,"2015-12-22T21:55:10.000Z");
testRunner.assertValid(); testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_SIZE, "5");
testRunner.assertValid();
testRunner.setProperty(GetAzureEventHub.RECEIVER_FETCH_TIMEOUT,"10000");
testRunner.assertValid();
} }
@Test @Test