From 0255c0c4ed9f1f7e9efa2ca6c3084839f956871a Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Wed, 15 Feb 2023 09:18:00 -0600 Subject: [PATCH] NIFI-11185 Removed GetTwitter Processor This closes #6958 Signed-off-by: Mike Thomsen --- .../nifi-twitter-processors/pom.xml | 32 +- .../nifi/processors/twitter/GetTwitter.java | 487 ------------------ .../nifi/processors/twitter/LocationUtil.java | 93 ---- .../org.apache.nifi.processor.Processor | 2 - .../processors/twitter/TestGetTwitter.java | 273 ---------- .../processors/twitter/TestLocationUtil.java | 76 --- 6 files changed, 2 insertions(+), 961 deletions(-) delete mode 100644 nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java delete mode 100644 nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java delete mode 100644 nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java delete mode 100644 nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml index 8b5767571c..7624a7dafe 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml @@ -44,46 +44,18 @@ org.apache.oltu.oauth2 org.apache.oltu.oauth2.client - - - - com.twitter - hbc-core - 2.2.0 - - com.google.code.findbugs - jsr305 - - - org.twitter4j - twitter4j-core - - - commons-logging - commons-logging + com.twitter + joauth - - org.slf4j - jcl-over-slf4j - - - com.github.stephenc.findbugs - findbugs-annotations - 1.3.9-1 - org.apache.nifi nifi-mock 2.0.0-SNAPSHOT test - - com.squareup.okhttp3 - okhttp - com.squareup.okhttp3 mockwebserver diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java deleted file mode 100644 index 0e17106a14..0000000000 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java +++ /dev/null @@ -1,487 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.twitter; - -import com.twitter.hbc.ClientBuilder; -import com.twitter.hbc.core.Client; -import com.twitter.hbc.core.Constants; -import com.twitter.hbc.core.endpoint.Location; -import com.twitter.hbc.core.endpoint.Location.Coordinate; -import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; -import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint; -import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; -import com.twitter.hbc.core.endpoint.StreamingEndpoint; -import com.twitter.hbc.core.event.Event; -import com.twitter.hbc.core.processor.StringDelimitedProcessor; -import com.twitter.hbc.httpclient.auth.Authentication; -import com.twitter.hbc.httpclient.auth.OAuth1; -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.DeprecationNotice; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; -import org.apache.nifi.annotation.notification.PrimaryNodeState; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.regex.Pattern; - -@Deprecated -@DeprecationNotice(alternatives = {ConsumeTwitter.class}, reason = "GetTwitter relies on the Twitter Hosebird client, which is not maintained. This processor will be removed in future releases.") -@SupportsBatching -@InputRequirement(Requirement.INPUT_FORBIDDEN) -@Tags({"twitter", "tweets", "social media", "status", "json"}) -@CapabilityDescription("Pulls status changes from Twitter's streaming API. In versions starting with 1.9.0, the Consumer Key and Access Token are marked as sensitive according to https://developer.twitter.com/en/docs/basics/authentication/guides/securing-keys-and-tokens") -@WritesAttribute(attribute = "mime.type", description = "Sets mime type to application/json") -@DynamicProperty(name="The name of a query parameter to add to the Twitter query", value="The value of a query parameter to add to the Twitter query", - description="Allows users to specify the name/value of a query parameter to add to the Twitter query") -public class GetTwitter extends AbstractProcessor { - - static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'"); - static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets"); - static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs"); - - public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder() - .name("Twitter Endpoint") - .description("Specifies which endpoint data should be pulled from") - .required(true) - .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER) - .defaultValue(ENDPOINT_SAMPLE.getValue()) - .build(); - public static final PropertyDescriptor MAX_CLIENT_ERROR_RETRIES = new PropertyDescriptor.Builder() - .name("max-client-error-retries") - .displayName("Max Client Error Retries") - .description("The maximum number of retries to attempt when client experience retryable connection errors." - + " Client continues attempting to reconnect using an exponential back-off pattern until it successfully reconnects" - + " or until it reaches the retry limit." - +" It is recommended to raise this value when client is getting rate limited by Twitter API. Default value is 5.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("5") - .build(); - public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder() - .name("Consumer Key") - .description("The Consumer Key provided by Twitter") - .required(true) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor CONSUMER_SECRET = new PropertyDescriptor.Builder() - .name("Consumer Secret") - .description("The Consumer Secret provided by Twitter") - .required(true) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() - .name("Access Token") - .description("The Access Token provided by Twitter") - .required(true) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new PropertyDescriptor.Builder() - .name("Access Token Secret") - .description("The Access Token Secret provided by Twitter") - .required(true) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor LANGUAGES = new PropertyDescriptor.Builder() - .name("Languages") - .description("A comma-separated list of languages for which tweets should be fetched") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor FOLLOWING = new PropertyDescriptor.Builder() - .name("IDs to Follow") - .description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.") - .required(false) - .addValidator(new FollowingValidator()) - .build(); - public static final PropertyDescriptor LOCATIONS = new PropertyDescriptor.Builder() - .name("Locations to Filter On") - .description("A comma-separated list of coordinates specifying one or more bounding boxes to filter on." - + "Each bounding box is specified by a pair of coordinates in the format: swLon,swLat,neLon,neLat. " - + "Multiple bounding boxes can be specified as such: swLon1,swLat1,neLon1,neLat1,swLon2,swLat2,neLon2,neLat2." - + "Ignored unless Endpoint is set to 'Filter Endpoint'.") - .addValidator(new LocationValidator() ) - .required(false) - .build(); - public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder() - .name("Terms to Filter On") - .description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'." - + " The filter works such that if any term matches, the status update will be retrieved; multiple terms" - + " separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that" - + " have either 'hello' or both 'it' AND 'was'") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All status updates will be routed to this relationship") - .build(); - - private List descriptors; - private Set relationships; - - private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(1000); - - private volatile ClientBuilder clientBuilder; - private volatile Client client; - private volatile BlockingQueue messageQueue = new LinkedBlockingQueue<>(5000); - - @Override - protected void init(final ProcessorInitializationContext context) { - final List descriptors = new ArrayList<>(); - descriptors.add(ENDPOINT); - descriptors.add(MAX_CLIENT_ERROR_RETRIES); - descriptors.add(CONSUMER_KEY); - descriptors.add(CONSUMER_SECRET); - descriptors.add(ACCESS_TOKEN); - descriptors.add(ACCESS_TOKEN_SECRET); - descriptors.add(LANGUAGES); - descriptors.add(TERMS); - descriptors.add(FOLLOWING); - descriptors.add(LOCATIONS); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); - } - - @Override - public Set getRelationships() { - return this.relationships; - } - - @Override - public final List getSupportedPropertyDescriptors() { - return descriptors; - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query") - .required(false) - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - } - - @Override - protected Collection customValidate(final ValidationContext validationContext) { - final List results = new ArrayList<>(); - final String endpointName = validationContext.getProperty(ENDPOINT).getValue(); - - if (ENDPOINT_FILTER.getValue().equals(endpointName)) { - if (!validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() && !validationContext.getProperty(LOCATIONS).isSet()) { - results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()) - .valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "'" + - "' or '" + LOCATIONS.getName() + " must be set").build()); - } - } - - return results; - } - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - // if any property is modified, the results are no longer valid. Destroy all messages in the queue. - messageQueue.clear(); - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - final String endpointName = context.getProperty(ENDPOINT).getValue(); - final int maxRetries = context.getProperty(MAX_CLIENT_ERROR_RETRIES).asInteger().intValue(); - final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(), - context.getProperty(CONSUMER_SECRET).getValue(), - context.getProperty(ACCESS_TOKEN).getValue(), - context.getProperty(ACCESS_TOKEN_SECRET).getValue()); - - final ClientBuilder clientBuilder = new ClientBuilder(); - clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]") - .authentication(oauth) - .eventMessageQueue(eventQueue) - .processor(new StringDelimitedProcessor(messageQueue)); - - final String languageString = context.getProperty(LANGUAGES).getValue(); - final List languages; - if (languageString == null) { - languages = null; - } else { - languages = new ArrayList<>(); - for (final String language : context.getProperty(LANGUAGES).getValue().split(",")) { - languages.add(language.trim()); - } - } - - final String host; - final StreamingEndpoint streamingEndpoint; - if (ENDPOINT_SAMPLE.getValue().equals(endpointName)) { - host = Constants.STREAM_HOST; - final StatusesSampleEndpoint sse = new StatusesSampleEndpoint(); - streamingEndpoint = sse; - if (languages != null) { - sse.languages(languages); - } - } else if (ENDPOINT_FIREHOSE.getValue().equals(endpointName)) { - host = Constants.STREAM_HOST; - final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint(); - streamingEndpoint = firehoseEndpoint; - if (languages != null) { - firehoseEndpoint.languages(languages); - } - } else if (ENDPOINT_FILTER.getValue().equals(endpointName)) { - host = Constants.STREAM_HOST; - final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint(); - - final String followingString = context.getProperty(FOLLOWING).getValue(); - final List followingIds; - if (followingString == null) { - followingIds = Collections.emptyList(); - } else { - followingIds = new ArrayList<>(); - - for (final String split : followingString.split(",")) { - final Long id = Long.parseLong(split.trim()); - followingIds.add(id); - } - } - - final String termString = context.getProperty(TERMS).getValue(); - final List terms; - if (termString == null) { - terms = Collections.emptyList(); - } else { - terms = new ArrayList<>(); - for (final String split : termString.split(",")) { - terms.add(split.trim()); - } - } - - if (!terms.isEmpty()) { - filterEndpoint.trackTerms(terms); - } - - if (!followingIds.isEmpty()) { - filterEndpoint.followings(followingIds); - } - - if (languages != null) { - filterEndpoint.languages(languages); - } - - final String locationString = context.getProperty(LOCATIONS).getValue(); - final List locations; - if (locationString == null) { - locations = Collections.emptyList(); - } else { - locations = LocationUtil.parseLocations(locationString); - } - - if (!locations.isEmpty()) { - filterEndpoint.locations(locations); - } - - streamingEndpoint = filterEndpoint ; - - } else { - throw new AssertionError("Endpoint was invalid value: " + endpointName); - } - - clientBuilder.hosts(host).endpoint(streamingEndpoint); - clientBuilder.retries(maxRetries); - this.clientBuilder = clientBuilder; - } - - public synchronized void connectNewClient() { - if (client == null || client.isDone()) { - client = clientBuilder.build(); - try { - client.connect(); - } catch (Exception e) { - client.stop(); - } - } - } - - @OnStopped - public void shutdownClient() { - if (client != null) { - client.stop(); - } - } - - @OnPrimaryNodeStateChange - public void onPrimaryNodeChange(final PrimaryNodeState newState) { - if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) { - shutdownClient(); - } - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - if (client == null || client.isDone()) { - connectNewClient(); - if (client.isDone()) { - context.yield(); - return; - } - } - final Event event = eventQueue.poll(); - if (event != null) { - switch (event.getEventType()) { - case STOPPED_BY_ERROR: - getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[]{event.getEventType(), event.getMessage(), event.getUnderlyingException()}); - break; - case CONNECTION_ERROR: - case HTTP_ERROR: - getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[]{event.getEventType(), event.getMessage()}); - client.reconnect(); - break; - default: - break; - } - } - - final String tweet = messageQueue.poll(); - if (tweet == null) { - context.yield(); - return; - } - - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(tweet.getBytes(StandardCharsets.UTF_8)); - } - }); - - final Map attributes = new HashMap<>(); - attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); - attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json"); - flowFile = session.putAllAttributes(flowFile, attributes); - - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI()); - } - - private static class FollowingValidator implements Validator { - - private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); - - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final String[] splits = input.split(","); - for (final String split : splits) { - if (!NUMBER_PATTERN.matcher(split.trim()).matches()) { - return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build(); - } - } - - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } - - } - - private static class LocationValidator implements Validator { - - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - try { - final List locations = LocationUtil.parseLocations(input); - for (final Location location : locations) { - final Coordinate sw = location.southwestCoordinate(); - final Coordinate ne = location.northeastCoordinate(); - - if (sw.longitude() > ne.longitude()) { - return new ValidationResult.Builder().input(input).subject(subject).valid(false) - .explanation("SW Longitude (" + sw.longitude() + ") must be less than NE Longitude (" - + ne.longitude() + ").").build(); - } - - if (sw.longitude() == ne.longitude()) { - return new ValidationResult.Builder().input(input).subject(subject).valid(false) - .explanation("SW Longitude (" + sw.longitude() + ") can not be equal to NE Longitude (" - + ne.longitude() + ").").build(); - } - - if (sw.latitude() > ne.latitude()) { - return new ValidationResult.Builder().input(input).subject(subject).valid(false) - .explanation("SW Latitude (" + sw.latitude() + ") must be less than NE Latitude (" - + ne.latitude() + ").").build(); - } - - if (sw.latitude() == ne.latitude()) { - return new ValidationResult.Builder().input(input).subject(subject).valid(false) - .explanation("SW Latitude (" + sw.latitude() + ") can not be equal to NE Latitude (" - + ne.latitude() + ").").build(); - } - } - - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - - } catch (IllegalStateException e) { - return new ValidationResult.Builder() - .input(input).subject(subject).valid(false) - .explanation("Must be a comma-separated list of longitude,latitude pairs specifying one or more bounding boxes.") - .build(); - } - } - - } - -} diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java deleted file mode 100644 index 6882b60e0c..0000000000 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/LocationUtil.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.twitter; - -import com.twitter.hbc.core.endpoint.Location; - -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Utility for parsing locations to be used with the Twitter API: - * - * https://dev.twitter.com/streaming/overview/request-parameters#locations - * - */ -public class LocationUtil { - - static final String LON = "[-+]?\\d{1,3}(?:[.]\\d+)?"; - static final String LAT = "[-+]?\\d{1,2}(?:[.]\\d+)?"; - static final String LON_LAT = LON + ",\\s*" + LAT; - static final String LOCATION = LON_LAT + ",\\s*" + LON_LAT; - - /** - * Used to find locations one at a time after knowing the locations String is valid. - */ - static final Pattern LOCATION_PATTERN = Pattern.compile(LOCATION); - - /** - * One or more locations separated by a comma, example: lon,lat,lon,lat,lon,lat,lon,lat - */ - static final Pattern LOCATIONS_PATTERN = Pattern.compile("(?:" + LOCATION + ")(?:,\\s*" + LOCATION + ")*"); - - - /** - * - * @param input a comma-separated list of longitude,latitude pairs specifying a set of bounding boxes, - * with the southwest corner of the bounding box coming first - * - * @return a list of the Location instances represented by the provided input - */ - public static List parseLocations(final String input) { - final List locations = new ArrayList<>(); - final Matcher locationsMatcher = LOCATIONS_PATTERN.matcher(input); - if (locationsMatcher.matches()) { - Matcher locationMatcher = LOCATION_PATTERN.matcher(input); - while (locationMatcher.find()) { - final String location = locationMatcher.group(); - locations.add(parseLocation(location)); - } - } else { - throw new IllegalStateException("The provided location string was invalid."); - } - - return locations; - } - - /** - * - * @param location a comma-separated list of longitude,latitude pairs specifying one location - * - * @return the Location instance for the provided input - */ - public static Location parseLocation(final String location) { - final String[] corSplit = location.split(","); - - final double swLon = Double.parseDouble(corSplit[0]) ; - final double swLat = Double.parseDouble(corSplit[1]) ; - - final double neLon = Double.parseDouble(corSplit[2]) ; - final double neLat = Double.parseDouble(corSplit[3]) ; - - Location.Coordinate sw = new Location.Coordinate(swLon, swLat) ; - Location.Coordinate ne = new Location.Coordinate(neLon, neLat) ; - return new Location(sw, ne) ; - } - -} diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 1f90feb7e5..27a2fe6e7a 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,6 +12,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -org.apache.nifi.processors.twitter.GetTwitter org.apache.nifi.processors.twitter.ConsumeTwitter diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java deleted file mode 100644 index d7dcb31e95..0000000000 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestGetTwitter.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.twitter; -import com.twitter.hbc.core.Client; -import com.twitter.hbc.core.endpoint.StreamingEndpoint; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.junit.jupiter.MockitoExtension; -import java.util.concurrent.BlockingQueue; - - -@ExtendWith(MockitoExtension.class) -public class TestGetTwitter { - private TestRunner runner; - @Mock - Client client = Mockito.mock(Client.class); - @Mock - BlockingQueue messageQueue = Mockito.mock(BlockingQueue.class); - @InjectMocks - GetTwitter getTwitter = new GetTwitter(); - - @BeforeEach - public void init() { - runner = TestRunners.newTestRunner(getTwitter); - } - - - @Test - public void testLocationValidatorWithValidLocations() { - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,41"); - runner.assertValid(); - } - - @Test - public void testLocationValidatorWithEqualLatitudes() { - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,40"); - runner.assertNotValid(); - } - - @Test - public void testLocationValidatorWithEqualLongitudes() { - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-74,41"); - runner.assertNotValid(); - } - - @Test - public void testLocationValidatorWithSWLatGreaterThanNELat() { - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,39"); - runner.assertNotValid(); - } - - @Test - public void testLocationValidatorWithSWLonGreaterThanNELon() { - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-75,41"); - runner.assertNotValid(); - } - - - // To test getSupportedDynamicPropertyDescriptor - @Test - public void testValidGetSupportedDynamicPropertyDescriptor() { - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - PropertyDescriptor dynamicProperty = new PropertyDescriptor.Builder() - .name("foo") - .description("Adds a query parameter with name '" + "foo" + "' to the Twitter query") - .required(false) - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - runner.setProperty(dynamicProperty, "{\"a\": \"a\"}"); - runner.assertValid(); - } - - - // To test customValidate - lines 222 to 224 - @Test - public void testCustomValidatorWithoutTermsFollowingLocation() { - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.assertNotValid(); - } - - // To test onScheduled using ENDPOINT_SAMPLE and language - // Mocking Client and messageQueue instead to avoid make calls to the Twitter service - @Test - public void testRunsOnSchedulerEndpointSampleAndLanguage() { - Mockito.when(messageQueue.poll()).thenReturn("Hello World!"); - StreamingEndpoint streamep = Mockito.mock(StreamingEndpoint.class); - Mockito.when(client.getEndpoint()).thenReturn(streamep); - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_SAMPLE); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.LANGUAGES, "en, pt, it"); - runner.assertValid(); - runner.run(); - } - - // To test onScheduled using ENDPOINT_SAMPLE - // Mocking Client and messageQueue instead to avoid make calls to the Twitter service - @Test - public void testRunsOnSchedulerEndpointSample() { - Mockito.when(messageQueue.poll()).thenReturn("Hello World!"); - StreamingEndpoint streamep = Mockito.mock(StreamingEndpoint.class); - Mockito.when(client.getEndpoint()).thenReturn(streamep); - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_SAMPLE); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.assertValid(); - runner.run(1); - } - - - // To test onScheduled using ENDPOINT_FILTER with valid locations, and language list - // Mocking Client and messageQueue instead to avoid make calls to the Twitter service - @Test - public void testRunsOnSchedulerEndpointFilterAndLanguage() { - Mockito.when(messageQueue.poll()).thenReturn("Hello World!"); - StreamingEndpoint streamep = Mockito.mock(StreamingEndpoint.class); - Mockito.when(client.getEndpoint()).thenReturn(streamep); - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.LOCATIONS, "-122.75,36.8,-121.75,37.8,-74,40,-73,41"); - runner.setProperty(GetTwitter.LANGUAGES, "en, pt, it"); - runner.assertValid(); - runner.run(1); - } - - // To test onScheduled using ENDPOINT_FILTER with valid TERMS and no language, and no location - // Mocking Client and messageQueue instead to avoid make calls to the Twitter service - @Test - public void testRunsOnSchedulerEndpointFilterAndTerms() { - Mockito.when(messageQueue.poll()).thenReturn("Hello World!"); - StreamingEndpoint streamep = Mockito.mock(StreamingEndpoint.class); - Mockito.when(client.getEndpoint()).thenReturn(streamep); - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.TERMS, "any thing we want to filter"); - runner.assertValid(); - runner.run(1); - } - - // To test onScheduled using ENDPOINT_FILTER with IDs to follow - // Mocking Client and messageQueue instead to avoid make calls to the Twitter service - @Test - public void testRunsOnSchedulerEndpointFilterAndID() { - Mockito.when(messageQueue.poll()).thenReturn("Hello World!"); - StreamingEndpoint streamep = Mockito.mock(StreamingEndpoint.class); - Mockito.when(client.getEndpoint()).thenReturn(streamep); - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - String followingIds = " 4265731,\n" + - " 27674040,\n" + - " 26123649,\n" + - " 9576402,\n" + - " 821958,\n" + - " 7852612,\n" + - " 819797\n"; - runner.setProperty(GetTwitter.FOLLOWING, followingIds); - runner.assertValid(); - runner.run(1); - } - - // To test onScheduled using ENDPOINT_FIREHOUSE and languages list - // Mocking Client and messageQueue instead to avoid make calls to the Twitter service - @Test - public void testRunsOnSchedulerEndpointFirehouseAndLanguage() { - Mockito.when(messageQueue.poll()).thenReturn("Hello World!"); - StreamingEndpoint streamep = Mockito.mock(StreamingEndpoint.class); - Mockito.when(client.getEndpoint()).thenReturn(streamep); - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FIREHOSE); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.LANGUAGES, "en, pt, it"); - runner.assertValid(); - runner.run(1); - } - - // To test FollowingValidator for Invalid Following - not number - // and test catch invalid location values - @Test - public void testCustomValidatorInvalidFollowingLocation() { - runner.setProperty(GetTwitter.ENDPOINT, GetTwitter.ENDPOINT_FILTER); - runner.setProperty(GetTwitter.MAX_CLIENT_ERROR_RETRIES, "5"); - runner.setProperty(GetTwitter.CONSUMER_KEY, "consumerKey"); - runner.setProperty(GetTwitter.CONSUMER_SECRET, "consumerSecret"); - runner.setProperty(GetTwitter.ACCESS_TOKEN, "accessToken"); - runner.setProperty(GetTwitter.ACCESS_TOKEN_SECRET, "accessTokenSecret"); - runner.setProperty(GetTwitter.FOLLOWING, "invalid id value"); - runner.setProperty(GetTwitter.LOCATIONS, "invalid location value"); - runner.assertNotValid(); - } -} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java deleted file mode 100644 index cb4538f29c..0000000000 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestLocationUtil.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.twitter; - -import com.twitter.hbc.core.endpoint.Location; -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class TestLocationUtil { - - @Test - public void testParseLocationsSingle() { - final String swLon = "-122.75"; - final String swLat = "36.8"; - final String neLon = "-121.75"; - final String neLat = "37.8"; - - final String locationString = swLon + "," + swLat + "," + neLon + "," + neLat; - List locations = LocationUtil.parseLocations(locationString); - assertEquals(1, locations.size()); - - Location location = locations.get(0); - assertEquals(new Double(location.southwestCoordinate().longitude()), Double.valueOf(swLon)); - assertEquals(new Double(location.southwestCoordinate().latitude()), Double.valueOf(swLat)); - assertEquals(new Double(location.northeastCoordinate().longitude()), Double.valueOf(neLon)); - assertEquals(new Double(location.northeastCoordinate().latitude()), Double.valueOf(neLat)); - } - - @Test - public void testParseLocationsMultiple() { - final Location location1 = new Location(new Location.Coordinate(-122.75, 36.8), new Location.Coordinate(-121.75,37.8)); - final Location location2 = new Location(new Location.Coordinate(-74, 40), new Location.Coordinate(-73, 41)); - final Location location3 = new Location(new Location.Coordinate(-64, 30), new Location.Coordinate(-63, 31)); - final Location location4 = new Location(new Location.Coordinate(-54, 20), new Location.Coordinate(-53, 21)); - - final List expectedLocations = Arrays.asList(location1, location2, location3, location4); - - final String locationString = "-122.75,36.8,-121.75,37.8,-74,40,-73,41,-64,30,-63,31,-54,20,-53,21"; - List locations = LocationUtil.parseLocations(locationString); - assertEquals(expectedLocations.size(), locations.size()); - - for (Location expectedLocation : expectedLocations) { - boolean found = false; - for (Location location : locations) { - if (location.northeastCoordinate().longitude() == expectedLocation.northeastCoordinate().longitude() - && location.northeastCoordinate().latitude() == expectedLocation.northeastCoordinate().latitude() - && location.southwestCoordinate().longitude() == expectedLocation.southwestCoordinate().longitude() - && location.southwestCoordinate().latitude() == expectedLocation.southwestCoordinate().latitude()) { - found = true; - break; - } - } - assertTrue(found); - } - - } -}