NIFI-855 Adding support for filtering on multiple locations

This commit is contained in:
Bryan Bende 2015-08-19 20:56:42 -04:00
parent 3cca046550
commit 706edeb01f
4 changed files with 296 additions and 51 deletions

View File

@ -124,9 +124,11 @@ public class GetTwitter extends AbstractProcessor {
.addValidator(new FollowingValidator())
.build();
public static final PropertyDescriptor LOCATIONS = new PropertyDescriptor.Builder()
.name("Location to Filter")
.description(" Bounding box for filtering tweets. Enter SW and NE corners in field (longitude, latitude, longitude, latitude). Example" +
"-77.55661, 39.25831, -77.14325, 39.5374")
.name("Locations to Filter On")
.description("A comma-separated list of coordinates specifying one or more bounding boxes to filter on."
+ "Each bounding box is specified by a pair of coordinates in the format: swLon,swLat,neLon,neLat. "
+ "Multiple bounding boxes can be specified as such: swLon1,swLat1,neLon1,neLat1,swLon2,swLat2,neLon2,neLat2."
+ "Ignored unless Endpoint is set to 'Filter Endpoint'.")
.addValidator(new LocationValidator() )
.required(false)
.build();
@ -299,28 +301,15 @@ public class GetTwitter extends AbstractProcessor {
}
final String locationString = context.getProperty(LOCATIONS).getValue();
final String[] corSplit = locationString.split(",");
final List<Location> locationIds ;
double swLon = Double.parseDouble( corSplit[0] ) ;
double neLon = Double.parseDouble( corSplit[2] ) ;
double swLat = Double.parseDouble( corSplit[1] ) ;
double neLat = Double.parseDouble( corSplit[3] ) ;
Coordinate sw = new Coordinate( swLon, swLat ) ;
Coordinate ne = new Coordinate( neLon, neLat ) ;
Location bbox = new Location ( sw, ne ) ;
if ( locationString == null ) {
locationIds = Collections.emptyList();
final List<Location> locations;
if (locationString == null) {
locations = Collections.emptyList();
} else {
locationIds = new ArrayList<>();
locationIds.add( bbox );
locations = LocationUtil.parseLocations(locationString);
}
if ( !locationIds.isEmpty() ) {
filterEndpoint.locations(locationIds);
if (!locations.isEmpty()) {
filterEndpoint.locations(locations);
}
streamingEndpoint = filterEndpoint ;
@ -402,42 +391,47 @@ public class GetTwitter extends AbstractProcessor {
private static class LocationValidator implements Validator {
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final String[] splits = input.split(",");
if ( splits.length != 4 ) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separated list of coordinates, SW and NE corners of your bounding box.").build();
} else {
try {
// Validate longitude coordinates
double swLon = Double.parseDouble(splits[0]);
double neLon = Double.parseDouble(splits[2]);
if (swLon < neLon) {
final List<Location> locations = LocationUtil.parseLocations(input);
for (final Location location : locations) {
final Coordinate sw = location.southwestCoordinate();
final Coordinate ne = location.northeastCoordinate();
if (sw.longitude() > ne.longitude()) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false)
.explanation("SW Longitude (" + sw.longitude() + ") must be less than NE Longitude ("
+ ne.longitude() + ").").build();
}
if (sw.longitude() == ne.longitude()) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false)
.explanation("SW Longitude (" + sw.longitude() + ") can not be equal to NE Longitude ("
+ ne.longitude() + ").").build();
}
if (sw.latitude() > ne.latitude()) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false)
.explanation("SW Latitude (" + sw.latitude() + ") must be less than NE Latitude ("
+ ne.latitude() + ").").build();
}
if (sw.latitude() == ne.latitude()) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false)
.explanation("SW Latitude (" + sw.latitude() + ") can not be equal to NE Latitude ("
+ ne.latitude() + ").").build();
}
}
// Validate latitude coordinates
double swLat = Double.parseDouble(splits[1]);
double neLat = Double.parseDouble(splits[3]);
if (swLat < neLat) {
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
} else {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Latitude must be less than NE Latitude.").build();
} catch (IllegalStateException e) {
return new ValidationResult.Builder()
.input(input).subject(subject).valid(false)
.explanation("Must be a comma-separated list of longitude,latitude pairs specifying one or more bounding boxes.")
.build();
}
} else {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Longitude must be less than NE Longitude.").build();
}
} catch ( Exception e ) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Bounding box location parse failure.").build();
}
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.twitter;
import com.twitter.hbc.core.endpoint.Location;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Utility for parsing locations to be used with the Twitter API:
*
* https://dev.twitter.com/streaming/overview/request-parameters#locations
*
*/
public class LocationUtil {
static final String LON = "[-+]?\\d{1,3}(?:[.]\\d+)?";
static final String LAT = "[-+]?\\d{1,2}(?:[.]\\d+)?";
static final String LON_LAT = LON + ",\\s*" + LAT;
static final String LOCATION = LON_LAT + ",\\s*" + LON_LAT;
/**
* Used to find locations one at a time after knowing the locations String is valid.
*/
static final Pattern LOCATION_PATTERN = Pattern.compile(LOCATION);
/**
* One or more locations separated by a comma, exampple: lon,lat,lon,lat,lon,lat,lon,lat
*/
static final Pattern LOCATIONS_PATTERN = Pattern.compile("(?:" + LOCATION + ")(?:,\\s*" + LOCATION + ")*");
/**
*
* @param input a comma-separated list of longitude,latitude pairs specifying a set of bounding boxes,
* with the southwest corner of the bounding box coming first
*
* @return a list of the Location instances represented by the provided input
*/
public static List<Location> parseLocations(final String input) {
final List<Location> locations = new ArrayList<>();
final Matcher locationsMatcher = LOCATIONS_PATTERN.matcher(input);
if (locationsMatcher.matches()) {
Matcher locationMatcher = LOCATION_PATTERN.matcher(input);
while (locationMatcher.find()) {
final String location = locationMatcher.group();
locations.add(parseLocation(location));
}
} else {
throw new IllegalStateException("The provided location string was invalid.");
}
return locations;
}
/**
*
* @param location a comma-separated list of longitude,latitude pairs specifying one location
*
* @return the Location instance for the provided input
*/
public static Location parseLocation(final String location) {
final String[] corSplit = location.split(",");
final double swLon = Double.parseDouble(corSplit[0]) ;
final double swLat = Double.parseDouble(corSplit[1]) ;
final double neLon = Double.parseDouble(corSplit[2]) ;
final double neLat = Double.parseDouble(corSplit[3]) ;
Location.Coordinate sw = new Location.Coordinate(swLon, swLat) ;
Location.Coordinate ne = new Location.Coordinate(neLon, neLat) ;
return new Location(sw, ne) ;
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.twitter;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
public class TestGetTwitter {
@Test
public void testLocationValidatorWithValidLocations() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,41");
runner.assertValid();
}
@Test
public void testLocationValidatorWithEqualLatitudes() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,40");
runner.assertNotValid();
}
@Test
public void testLocationValidatorWithEqualLongitudes() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-74,41");
runner.assertNotValid();
}
@Test
public void testLocationValidatorWithSWLatGreaterThanNELat() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,39");
runner.assertNotValid();
}
@Test
public void testLocationValidatorWithSWLonGreaterThanNELon() {
final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class);
runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER);
runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey");
runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret");
runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken");
runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret");
runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-75,41");
runner.assertNotValid();
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.twitter;
import com.twitter.hbc.core.endpoint.Location;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class TestLocationUtil {
@Test
public void testParseLocationsSingle() {
final String swLon = "-122.75";
final String swLat = "36.8";
final String neLon = "-121.75";
final String neLat = "37.8";
final String locationString = swLon + "," + swLat + "," + neLon + "," + neLat;
List<Location> locations = LocationUtil.parseLocations(locationString);
Assert.assertEquals(1, locations.size());
Location location = locations.get(0);
Assert.assertEquals(new Double(location.southwestCoordinate().longitude()), Double.valueOf(swLon));
Assert.assertEquals(new Double(location.southwestCoordinate().latitude()), Double.valueOf(swLat));
Assert.assertEquals(new Double(location.northeastCoordinate().longitude()), Double.valueOf(neLon));
Assert.assertEquals(new Double(location.northeastCoordinate().latitude()), Double.valueOf(neLat));
}
@Test
public void testParseLocationsMultiple() {
final Location location1 = new Location(new Location.Coordinate(-122.75, 36.8), new Location.Coordinate(-121.75,37.8));
final Location location2 = new Location(new Location.Coordinate(-74, 40), new Location.Coordinate(-73, 41));
final Location location3 = new Location(new Location.Coordinate(-64, 30), new Location.Coordinate(-63, 31));
final Location location4 = new Location(new Location.Coordinate(-54, 20), new Location.Coordinate(-53, 21));
final List<Location> expectedLocations = Arrays.asList(location1, location2, location3, location4);
final String locationString = "-122.75,36.8,-121.75,37.8,-74,40,-73,41,-64,30,-63,31,-54,20,-53,21";
List<Location> locations = LocationUtil.parseLocations(locationString);
Assert.assertEquals(expectedLocations.size(), locations.size());
for (Location expectedLocation : expectedLocations) {
boolean found = false;
for (Location location : locations) {
if (location.northeastCoordinate().longitude() == expectedLocation.northeastCoordinate().longitude()
&& location.northeastCoordinate().latitude() == expectedLocation.northeastCoordinate().latitude()
&& location.southwestCoordinate().longitude() == expectedLocation.southwestCoordinate().longitude()
&& location.southwestCoordinate().latitude() == expectedLocation.southwestCoordinate().latitude()) {
found = true;
break;
}
}
Assert.assertTrue(found);
}
}
}