NIFI-1103: Add support for long polling in GetSQS processor

This commit is contained in:
Adam Lamar 2015-11-12 22:03:36 -07:00
parent e6086420aa
commit 428b20fc25
1 changed files with 11 additions and 1 deletions

View File

@ -103,8 +103,17 @@ public class GetSQS extends AbstractSQSProcessor {
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.build(); .build();
public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new PropertyDescriptor.Builder()
.name("Receive Message Wait Time")
.description("The maximum amount of time to wait on a long polling receive call. Setting this to a value of 1 second or greater will reduce the number of SQS requests and decrease fetch latency at the cost of a constantly active thread.")
.expressionLanguageSupported(false)
.required(false)
.defaultValue("0 sec")
.addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.SECONDS, 20, TimeUnit.SECONDS)) // 20 seconds is the maximum allowed by SQS
.build();
public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(
Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT)); Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME));
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -127,6 +136,7 @@ public class GetSQS extends AbstractSQSProcessor {
request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()); request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger());
request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()); request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue());
request.setQueueUrl(queueUrl); request.setQueueUrl(queueUrl);
request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue());
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());