NIFI-855 Add location bounding bix filter to twitter processor.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Chris Mangold 2015-08-17 11:09:10 -04:00 committed by Bryan Bende
parent 4f92193159
commit 3cca046550
1 changed files with 88 additions and 5 deletions

View File

@ -57,6 +57,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import com.twitter.hbc.ClientBuilder; import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client; import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants; import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.Location.Coordinate ;
import com.twitter.hbc.core.endpoint.Location ;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint; import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
@ -98,7 +100,7 @@ public class GetTwitter extends AbstractProcessor {
.build(); .build();
public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
.name("Access Token") .name("Access Token")
.description("The Acces Token provided by Twitter") .description("The Access Token provided by Twitter")
.required(true) .required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build(); .build();
@ -121,6 +123,13 @@ public class GetTwitter extends AbstractProcessor {
.required(false) .required(false)
.addValidator(new FollowingValidator()) .addValidator(new FollowingValidator())
.build(); .build();
public static final PropertyDescriptor LOCATIONS = new PropertyDescriptor.Builder()
.name("Location to Filter")
.description(" Bounding box for filtering tweets. Enter SW and NE corners in field (longitude, latitude, longitude, latitude). Example" +
"-77.55661, 39.25831, -77.14325, 39.5374")
.addValidator(new LocationValidator() )
.required(false)
.build();
public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder() public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder()
.name("Terms to Filter On") .name("Terms to Filter On")
.description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'." .description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'."
@ -155,6 +164,7 @@ public class GetTwitter extends AbstractProcessor {
descriptors.add(LANGUAGES); descriptors.add(LANGUAGES);
descriptors.add(TERMS); descriptors.add(TERMS);
descriptors.add(FOLLOWING); descriptors.add(FOLLOWING);
descriptors.add(LOCATIONS);
this.descriptors = Collections.unmodifiableList(descriptors); this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>(); final Set<Relationship> relationships = new HashSet<>();
@ -189,9 +199,10 @@ public class GetTwitter extends AbstractProcessor {
final String endpointName = validationContext.getProperty(ENDPOINT).getValue(); final String endpointName = validationContext.getProperty(ENDPOINT).getValue();
if (ENDPOINT_FILTER.getValue().equals(endpointName)) { if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
if (!validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet()) { if (!validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() && !validationContext.getProperty(LOCATIONS).isSet()) {
results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()) results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName())
.valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build()); .valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "'" +
"' or '" + LOCATIONS.getName() + " must be set").build());
} }
} }
@ -206,6 +217,8 @@ public class GetTwitter extends AbstractProcessor {
@OnScheduled @OnScheduled
public void onScheduled(final ProcessContext context) throws MalformedURLException { public void onScheduled(final ProcessContext context) throws MalformedURLException {
messageQueue = new LinkedBlockingQueue<>(100000);
final String endpointName = context.getProperty(ENDPOINT).getValue(); final String endpointName = context.getProperty(ENDPOINT).getValue();
final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(), final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
context.getProperty(CONSUMER_SECRET).getValue(), context.getProperty(CONSUMER_SECRET).getValue(),
@ -284,7 +297,34 @@ public class GetTwitter extends AbstractProcessor {
if (languages != null) { if (languages != null) {
filterEndpoint.languages(languages); filterEndpoint.languages(languages);
} }
final String locationString = context.getProperty(LOCATIONS).getValue();
final String[] corSplit = locationString.split(",");
final List<Location> locationIds ;
double swLon = Double.parseDouble( corSplit[0] ) ;
double neLon = Double.parseDouble( corSplit[2] ) ;
double swLat = Double.parseDouble( corSplit[1] ) ;
double neLat = Double.parseDouble( corSplit[3] ) ;
Coordinate sw = new Coordinate( swLon, swLat ) ;
Coordinate ne = new Coordinate( neLon, neLat ) ;
Location bbox = new Location ( sw, ne ) ;
if ( locationString == null ) {
locationIds = Collections.emptyList();
} else {
locationIds = new ArrayList<>();
locationIds.add( bbox );
}
if ( !locationIds.isEmpty() ) {
filterEndpoint.locations(locationIds);
}
streamingEndpoint = filterEndpoint ; streamingEndpoint = filterEndpoint ;
} else { } else {
throw new AssertionError("Endpoint was invalid value: " + endpointName); throw new AssertionError("Endpoint was invalid value: " + endpointName);
} }
@ -359,4 +399,47 @@ public class GetTwitter extends AbstractProcessor {
} }
} }
private static class LocationValidator implements Validator {
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final String[] splits = input.split(",");
if ( splits.length != 4 ) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separated list of coordinates, SW and NE corners of your bounding box.").build();
} else {
try {
// Validate longitude coordinates
double swLon = Double.parseDouble(splits[0]);
double neLon = Double.parseDouble(splits[2]);
if (swLon < neLon) {
// Validate latitude coordinates
double swLat = Double.parseDouble(splits[1]);
double neLat = Double.parseDouble(splits[3]);
if (swLat < neLat) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Latitude must be less than NE Latitude.").build();
}
} else {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Longitude must be less than NE Longitude.").build();
}
} catch ( Exception e ) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Bounding box location parse failure.").build();
}
}
}
}
} }