NIFI-5953 Manage GetTwitter connection retries on '420 Enhance Your Calm' exceptions

NIFI-5953 Manage GetTwitter connection retries on '420 Enhance Your Calm' exceptions

Update "Max Client Error Retries" parameter name.
reintriduce client.reconnect() on HTTP_ERROR 420

This closes #3276.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Kourge 2019-01-28 14:10:54 +01:00 committed by Koji Kawamura
parent fb9f2af04a
commit cded30b3d2
2 changed files with 19 additions and 0 deletions

View File

@ -85,6 +85,17 @@ public class GetTwitter extends AbstractProcessor {
.allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER)
.defaultValue(ENDPOINT_SAMPLE.getValue())
.build();
public static final PropertyDescriptor MAX_CLIENT_ERROR_RETRIES = new PropertyDescriptor.Builder()
.name("max-client-error-retries")
.displayName("Max Client Error Retries")
.description("The maximum number of retries to attempt when client experience retryable connection errors."
+ " Client continues attempting to reconnect using an exponential back-off pattern until it successfully reconnects"
+ " or until it reaches the retry limit."
+" It is recommended to raise this value when client is getting rate limited by Twitter API. Default value is 5.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.build();
public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder()
.name("Consumer Key")
.description("The Consumer Key provided by Twitter")
@ -161,6 +172,7 @@ public class GetTwitter extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ENDPOINT);
descriptors.add(MAX_CLIENT_ERROR_RETRIES);
descriptors.add(CONSUMER_KEY);
descriptors.add(CONSUMER_SECRET);
descriptors.add(ACCESS_TOKEN);
@ -222,6 +234,7 @@ public class GetTwitter extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
final String endpointName = context.getProperty(ENDPOINT).getValue();
final int maxRetries = context.getProperty(MAX_CLIENT_ERROR_RETRIES).asInteger().intValue();
final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
context.getProperty(CONSUMER_SECRET).getValue(),
context.getProperty(ACCESS_TOKEN).getValue(),
@ -319,6 +332,7 @@ public class GetTwitter extends AbstractProcessor {
}
clientBuilder.hosts(host).endpoint(streamingEndpoint);
clientBuilder.retries(maxRetries);
client = clientBuilder.build();
client.connect();
}

View File

@ -26,6 +26,7 @@ public class TestGetTwitter {
public void testLocationValidatorWithValidLocations() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
@ -38,6 +39,7 @@ public class TestGetTwitter {
public void testLocationValidatorWithEqualLatitudes() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
@ -50,6 +52,7 @@ public class TestGetTwitter {
public void testLocationValidatorWithEqualLongitudes() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
@ -62,6 +65,7 @@ public class TestGetTwitter {
public void testLocationValidatorWithSWLatGreaterThanNELat() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");
@ -74,6 +78,7 @@ public class TestGetTwitter {
public void testLocationValidatorWithSWLonGreaterThanNELon() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5");
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken");