From 428b20fc2553ac6ae56867630fa10bd3ac355582 Mon Sep 17 00:00:00 2001 From: Adam Lamar Date: Thu, 12 Nov 2015 22:03:36 -0700 Subject: [PATCH] NIFI-1103: Add support for long polling in GetSQS processor --- .../org/apache/nifi/processors/aws/sqs/GetSQS.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java index 10b17e9f48..b73dd39017 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -103,8 +103,17 @@ public class GetSQS extends AbstractSQSProcessor { .expressionLanguageSupported(false) .build(); + public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new PropertyDescriptor.Builder() + .name("Receive Message Wait Time") + .description("The maximum amount of time to wait on a long polling receive call. Setting this to a value of 1 second or greater will reduce the number of SQS requests and decrease fetch latency at the cost of a constantly active thread.") + .expressionLanguageSupported(false) + .required(false) + .defaultValue("0 sec") + .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.SECONDS, 20, TimeUnit.SECONDS)) // 20 seconds is the maximum allowed by SQS + .build(); + public static final List properties = Collections.unmodifiableList( - Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT)); + Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME)); @Override protected List getSupportedPropertyDescriptors() { @@ -127,6 +136,7 @@ public class GetSQS extends AbstractSQSProcessor { request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()); request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()); request.setQueueUrl(queueUrl); + request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue()); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());