From 3cca0465508b8b821678ca77f547a3c2589667d5 Mon Sep 17 00:00:00 2001 From: Chris Mangold Date: Mon, 17 Aug 2015 11:09:10 -0400 Subject: [PATCH] NIFI-855 Add location bounding bix filter to twitter processor. Signed-off-by: Bryan Bende --- .../nifi/processors/twitter/GetTwitter.java | 93 ++++++++++++++++++- 1 file changed, 88 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java index f24b086a1d..d4874921f9 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java @@ -57,6 +57,8 @@ import org.apache.nifi.processor.util.StandardValidators; import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.endpoint.Location.Coordinate ; +import com.twitter.hbc.core.endpoint.Location ; import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint; import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; @@ -98,7 +100,7 @@ public class GetTwitter extends AbstractProcessor { .build(); public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() .name("Access Token") - .description("The Acces Token provided by Twitter") + .description("The Access Token provided by Twitter") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -121,6 +123,13 @@ public class GetTwitter extends AbstractProcessor { .required(false) .addValidator(new FollowingValidator()) .build(); + public static final PropertyDescriptor LOCATIONS = new PropertyDescriptor.Builder() + .name("Location to Filter") + .description(" Bounding box for filtering tweets. Enter SW and NE corners in field (longitude, latitude, longitude, latitude). Example" + + "-77.55661, 39.25831, -77.14325, 39.5374") + .addValidator(new LocationValidator() ) + .required(false) + .build(); public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder() .name("Terms to Filter On") .description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'." @@ -155,6 +164,7 @@ public class GetTwitter extends AbstractProcessor { descriptors.add(LANGUAGES); descriptors.add(TERMS); descriptors.add(FOLLOWING); + descriptors.add(LOCATIONS); this.descriptors = Collections.unmodifiableList(descriptors); final Set relationships = new HashSet<>(); @@ -189,9 +199,10 @@ public class GetTwitter extends AbstractProcessor { final String endpointName = validationContext.getProperty(ENDPOINT).getValue(); if (ENDPOINT_FILTER.getValue().equals(endpointName)) { - if (!validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet()) { + if (!validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() && !validationContext.getProperty(LOCATIONS).isSet()) { results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()) - .valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build()); + .valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "'" + + "' or '" + LOCATIONS.getName() + " must be set").build()); } } @@ -206,6 +217,8 @@ public class GetTwitter extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) throws MalformedURLException { + messageQueue = new LinkedBlockingQueue<>(100000); + final String endpointName = context.getProperty(ENDPOINT).getValue(); final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(), context.getProperty(CONSUMER_SECRET).getValue(), @@ -284,7 +297,34 @@ public class GetTwitter extends AbstractProcessor { if (languages != null) { filterEndpoint.languages(languages); } - streamingEndpoint = filterEndpoint; + + final String locationString = context.getProperty(LOCATIONS).getValue(); + final String[] corSplit = locationString.split(","); + + final List locationIds ; + + double swLon = Double.parseDouble( corSplit[0] ) ; + double neLon = Double.parseDouble( corSplit[2] ) ; + double swLat = Double.parseDouble( corSplit[1] ) ; + double neLat = Double.parseDouble( corSplit[3] ) ; + + Coordinate sw = new Coordinate( swLon, swLat ) ; + Coordinate ne = new Coordinate( neLon, neLat ) ; + Location bbox = new Location ( sw, ne ) ; + + if ( locationString == null ) { + locationIds = Collections.emptyList(); + } else { + locationIds = new ArrayList<>(); + locationIds.add( bbox ); + } + + if ( !locationIds.isEmpty() ) { + filterEndpoint.locations(locationIds); + } + + streamingEndpoint = filterEndpoint ; + } else { throw new AssertionError("Endpoint was invalid value: " + endpointName); } @@ -359,4 +399,47 @@ public class GetTwitter extends AbstractProcessor { } } -} + + private static class LocationValidator implements Validator { + + private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String[] splits = input.split(","); + if ( splits.length != 4 ) { + + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separated list of coordinates, SW and NE corners of your bounding box.").build(); + + } else { + + try { + // Validate longitude coordinates + double swLon = Double.parseDouble(splits[0]); + double neLon = Double.parseDouble(splits[2]); + if (swLon < neLon) { + + // Validate latitude coordinates + double swLat = Double.parseDouble(splits[1]); + double neLat = Double.parseDouble(splits[3]); + if (swLat < neLat) { + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + + } else { + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Latitude must be less than NE Latitude.").build(); + } + + } else { + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Longitude must be less than NE Longitude.").build(); + } + } catch ( Exception e ) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Bounding box location parse failure.").build(); + } + + } + + } + + } + +} \ No newline at end of file