From cded30b3d2dc0497cf3d06051f55ca304c744250 Mon Sep 17 00:00:00 2001 From: Kourge <27643232+kourge-ch@users.noreply.github.com> Date: Mon, 28 Jan 2019 14:10:54 +0100 Subject: [PATCH] NIFI-5953 Manage GetTwitter connection retries on '420 Enhance Your Calm' exceptions NIFI-5953 Manage GetTwitter connection retries on '420 Enhance Your Calm' exceptions Update "Max Client Error Retries" parameter name. reintriduce client.reconnect() on HTTP_ERROR 420 This closes #3276. Signed-off-by: Koji Kawamura --- .../apache/nifi/processors/twitter/GetTwitter.java | 14 ++++++++++++++ .../nifi/processors/twitter/TestGetTwitter.java | 5 +++++ 2 files changed, 19 insertions(+) diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java index f0a8b0cfaf..dafa9d0f33 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java @@ -85,6 +85,17 @@ public class GetTwitter extends AbstractProcessor { .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER) .defaultValue(ENDPOINT_SAMPLE.getValue()) .build(); + public static final PropertyDescriptor MAX_CLIENT_ERROR_RETRIES = new PropertyDescriptor.Builder() + .name("max-client-error-retries") + .displayName("Max Client Error Retries") + .description("The maximum number of retries to attempt when client experience retryable connection errors." + + " Client continues attempting to reconnect using an exponential back-off pattern until it successfully reconnects" + + " or until it reaches the retry limit." + +" It is recommended to raise this value when client is getting rate limited by Twitter API. Default value is 5.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("5") + .build(); public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder() .name("Consumer Key") .description("The Consumer Key provided by Twitter") @@ -161,6 +172,7 @@ public class GetTwitter extends AbstractProcessor { protected void init(final ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); descriptors.add(ENDPOINT); + descriptors.add(MAX_CLIENT_ERROR_RETRIES); descriptors.add(CONSUMER_KEY); descriptors.add(CONSUMER_SECRET); descriptors.add(ACCESS_TOKEN); @@ -222,6 +234,7 @@ public class GetTwitter extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { final String endpointName = context.getProperty(ENDPOINT).getValue(); + final int maxRetries = context.getProperty(MAX_CLIENT_ERROR_RETRIES).asInteger().intValue(); final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(), context.getProperty(CONSUMER_SECRET).getValue(), context.getProperty(ACCESS_TOKEN).getValue(), @@ -319,6 +332,7 @@ public class GetTwitter extends AbstractProcessor { } clientBuilder.hosts(host).endpoint(streamingEndpoint); + clientBuilder.retries(maxRetries); client = clientBuilder.build(); client.connect(); } diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java index e56be67e3d..6124f3415a 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java @@ -26,6 +26,7 @@ public class TestGetTwitter { public void testLocationValidatorWithValidLocations() { final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); @@ -38,6 +39,7 @@ public class TestGetTwitter { public void testLocationValidatorWithEqualLatitudes() { final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); @@ -50,6 +52,7 @@ public class TestGetTwitter { public void testLocationValidatorWithEqualLongitudes() { final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); @@ -62,6 +65,7 @@ public class TestGetTwitter { public void testLocationValidatorWithSWLatGreaterThanNELat() { final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); @@ -74,6 +78,7 @@ public class TestGetTwitter { public void testLocationValidatorWithSWLonGreaterThanNELon() { final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");