diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java index d4874921f9..a78b1122f5 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java @@ -124,9 +124,11 @@ public class GetTwitter extends AbstractProcessor { .addValidator(new FollowingValidator()) .build(); public static final PropertyDescriptor LOCATIONS = new PropertyDescriptor.Builder() - .name("Location to Filter") - .description(" Bounding box for filtering tweets. Enter SW and NE corners in field (longitude, latitude, longitude, latitude). Example" + - "-77.55661, 39.25831, -77.14325, 39.5374") + .name("Locations to Filter On") + .description("A comma-separated list of coordinates specifying one or more bounding boxes to filter on." + + "Each bounding box is specified by a pair of coordinates in the format: swLon,swLat,neLon,neLat. " + + "Multiple bounding boxes can be specified as such: swLon1,swLat1,neLon1,neLat1,swLon2,swLat2,neLon2,neLat2." + + "Ignored unless Endpoint is set to 'Filter Endpoint'.") .addValidator(new LocationValidator() ) .required(false) .build(); @@ -299,28 +301,15 @@ public class GetTwitter extends AbstractProcessor { } final String locationString = context.getProperty(LOCATIONS).getValue(); - final String[] corSplit = locationString.split(","); - - final List locationIds ; - - double swLon = Double.parseDouble( corSplit[0] ) ; - double neLon = Double.parseDouble( corSplit[2] ) ; - double swLat = Double.parseDouble( corSplit[1] ) ; - double neLat = Double.parseDouble( corSplit[3] ) ; - - Coordinate sw = new Coordinate( swLon, swLat ) ; - Coordinate ne = new Coordinate( neLon, neLat ) ; - Location bbox = new Location ( sw, ne ) ; - - if ( locationString == null ) { - locationIds = Collections.emptyList(); + final List locations; + if (locationString == null) { + locations = Collections.emptyList(); } else { - locationIds = new ArrayList<>(); - locationIds.add( bbox ); + locations = LocationUtil.parseLocations(locationString); } - if ( !locationIds.isEmpty() ) { - filterEndpoint.locations(locationIds); + if (!locations.isEmpty()) { + filterEndpoint.locations(locations); } streamingEndpoint = filterEndpoint ; @@ -402,42 +391,47 @@ public class GetTwitter extends AbstractProcessor { private static class LocationValidator implements Validator { - private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); - @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final String[] splits = input.split(","); - if ( splits.length != 4 ) { + try { + final List locations = LocationUtil.parseLocations(input); + for (final Location location : locations) { + final Coordinate sw = location.southwestCoordinate(); + final Coordinate ne = location.northeastCoordinate(); - return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separated list of coordinates, SW and NE corners of your bounding box.").build(); - - } else { - - try { - // Validate longitude coordinates - double swLon = Double.parseDouble(splits[0]); - double neLon = Double.parseDouble(splits[2]); - if (swLon < neLon) { - - // Validate latitude coordinates - double swLat = Double.parseDouble(splits[1]); - double neLat = Double.parseDouble(splits[3]); - if (swLat < neLat) { - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - - } else { - return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Latitude must be less than NE Latitude.").build(); - } - - } else { - return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("SW Longitude must be less than NE Longitude.").build(); + if (sw.longitude() > ne.longitude()) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false) + .explanation("SW Longitude (" + sw.longitude() + ") must be less than NE Longitude (" + + ne.longitude() + ").").build(); + } + + if (sw.longitude() == ne.longitude()) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false) + .explanation("SW Longitude (" + sw.longitude() + ") can not be equal to NE Longitude (" + + ne.longitude() + ").").build(); + } + + if (sw.latitude() > ne.latitude()) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false) + .explanation("SW Latitude (" + sw.latitude() + ") must be less than NE Latitude (" + + ne.latitude() + ").").build(); + } + + if (sw.latitude() == ne.latitude()) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false) + .explanation("SW Latitude (" + sw.latitude() + ") can not be equal to NE Latitude (" + + ne.latitude() + ").").build(); } - } catch ( Exception e ) { - return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Bounding box location parse failure.").build(); } - } + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } catch (IllegalStateException e) { + return new ValidationResult.Builder() + .input(input).subject(subject).valid(false) + .explanation("Must be a comma-separated list of longitude,latitude pairs specifying one or more bounding boxes.") + .build(); + } } } diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java new file mode 100644 index 0000000000..63e881aba5 --- /dev/null +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.twitter; + +import com.twitter.hbc.core.endpoint.Location; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Utility for parsing locations to be used with the Twitter API: + * + * https://dev.twitter.com/streaming/overview/request-parameters#locations + * + */ +public class LocationUtil { + + static final String LON = "[-+]?\\d{1,3}(?:[.]\\d+)?"; + static final String LAT = "[-+]?\\d{1,2}(?:[.]\\d+)?"; + static final String LON_LAT = LON + ",\\s*" + LAT; + static final String LOCATION = LON_LAT + ",\\s*" + LON_LAT; + + /** + * Used to find locations one at a time after knowing the locations String is valid. + */ + static final Pattern LOCATION_PATTERN = Pattern.compile(LOCATION); + + /** + * One or more locations separated by a comma, exampple: lon,lat,lon,lat,lon,lat,lon,lat + */ + static final Pattern LOCATIONS_PATTERN = Pattern.compile("(?:" + LOCATION + ")(?:,\\s*" + LOCATION + ")*"); + + + /** + * + * @param input a comma-separated list of longitude,latitude pairs specifying a set of bounding boxes, + * with the southwest corner of the bounding box coming first + * + * @return a list of the Location instances represented by the provided input + */ + public static List parseLocations(final String input) { + final List locations = new ArrayList<>(); + final Matcher locationsMatcher = LOCATIONS_PATTERN.matcher(input); + if (locationsMatcher.matches()) { + Matcher locationMatcher = LOCATION_PATTERN.matcher(input); + while (locationMatcher.find()) { + final String location = locationMatcher.group(); + locations.add(parseLocation(location)); + } + } else { + throw new IllegalStateException("The provided location string was invalid."); + } + + return locations; + } + + /** + * + * @param location a comma-separated list of longitude,latitude pairs specifying one location + * + * @return the Location instance for the provided input + */ + public static Location parseLocation(final String location) { + final String[] corSplit = location.split(","); + + final double swLon = Double.parseDouble(corSplit[0]) ; + final double swLat = Double.parseDouble(corSplit[1]) ; + + final double neLon = Double.parseDouble(corSplit[2]) ; + final double neLat = Double.parseDouble(corSplit[3]) ; + + Location.Coordinate sw = new Location.Coordinate(swLon, swLat) ; + Location.Coordinate ne = new Location.Coordinate(neLon, neLat) ; + return new Location(sw, ne) ; + } + +} diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java new file mode 100644 index 0000000000..132fa178ba --- /dev/null +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.twitter; + +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestGetTwitter { + + @Test + public void testLocationValidatorWithValidLocations() { + final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); + runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); + runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); + runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken"); + runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); + runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,41"); + runner.assertValid(); + } + + @Test + public void testLocationValidatorWithEqualLatitudes() { + final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); + runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); + runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); + runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken"); + runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); + runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,40"); + runner.assertNotValid(); + } + + @Test + public void testLocationValidatorWithEqualLongitudes() { + final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); + runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); + runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); + runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken"); + runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); + runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-74,41"); + runner.assertNotValid(); + } + + @Test + public void testLocationValidatorWithSWLatGreaterThanNELat() { + final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); + runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); + runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); + runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken"); + runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); + runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,39"); + runner.assertNotValid(); + } + + @Test + public void testLocationValidatorWithSWLonGreaterThanNELon() { + final TestRunner runner = TestRunners.newTestRunner(GetTwitter.class); + runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); + runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); + runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); + runner.setProperty(GetTwitter.ACCESS_TOKEN, "acessToken"); + runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); + runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-75,41"); + runner.assertNotValid(); + } +} diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java new file mode 100644 index 0000000000..4d21613a5c --- /dev/null +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.twitter; + +import com.twitter.hbc.core.endpoint.Location; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class TestLocationUtil { + + @Test + public void testParseLocationsSingle() { + final String swLon = "-122.75"; + final String swLat = "36.8"; + final String neLon = "-121.75"; + final String neLat = "37.8"; + + final String locationString = swLon + "," + swLat + "," + neLon + "," + neLat; + List locations = LocationUtil.parseLocations(locationString); + Assert.assertEquals(1, locations.size()); + + Location location = locations.get(0); + Assert.assertEquals(new Double(location.southwestCoordinate().longitude()), Double.valueOf(swLon)); + Assert.assertEquals(new Double(location.southwestCoordinate().latitude()), Double.valueOf(swLat)); + Assert.assertEquals(new Double(location.northeastCoordinate().longitude()), Double.valueOf(neLon)); + Assert.assertEquals(new Double(location.northeastCoordinate().latitude()), Double.valueOf(neLat)); + } + + @Test + public void testParseLocationsMultiple() { + final Location location1 = new Location(new Location.Coordinate(-122.75, 36.8), new Location.Coordinate(-121.75,37.8)); + final Location location2 = new Location(new Location.Coordinate(-74, 40), new Location.Coordinate(-73, 41)); + final Location location3 = new Location(new Location.Coordinate(-64, 30), new Location.Coordinate(-63, 31)); + final Location location4 = new Location(new Location.Coordinate(-54, 20), new Location.Coordinate(-53, 21)); + + final List expectedLocations = Arrays.asList(location1, location2, location3, location4); + + final String locationString = "-122.75,36.8,-121.75,37.8,-74,40,-73,41,-64,30,-63,31,-54,20,-53,21"; + List locations = LocationUtil.parseLocations(locationString); + Assert.assertEquals(expectedLocations.size(), locations.size()); + + for (Location expectedLocation : expectedLocations) { + boolean found = false; + for (Location location : locations) { + if (location.northeastCoordinate().longitude() == expectedLocation.northeastCoordinate().longitude() + && location.northeastCoordinate().latitude() == expectedLocation.northeastCoordinate().latitude() + && location.southwestCoordinate().longitude() == expectedLocation.southwestCoordinate().longitude() + && location.southwestCoordinate().latitude() == expectedLocation.southwestCoordinate().latitude()) { + found = true; + break; + } + } + Assert.assertTrue(found); + } + + } +}