From e20aa0ea2ae114288f78911ded16e6b111426534 Mon Sep 17 00:00:00 2001 From: Emilio Setiadarma Date: Thu, 3 Mar 2022 14:40:42 -0800 Subject: [PATCH] NIFI-9755 Added ConsumeTwitter Processor This closes #5947 Signed-off-by: David Handermann --- nifi-assembly/LICENSE | 23 ++ .../src/main/resources/META-INF/NOTICE | 9 + .../nifi-twitter-processors/pom.xml | 20 + .../processors/twitter/ConsumeTwitter.java | 355 ++++++++++++++++++ .../processors/twitter/StreamEndpoint.java | 38 ++ .../twitter/TweetStreamService.java | 235 ++++++++++++ .../org.apache.nifi.processor.Processor | 3 +- .../twitter/TestConsumeTwitter.java | 85 +++++ 8 files changed, 767 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java create mode 100644 nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/StreamEndpoint.java create mode 100644 nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java create mode 100644 nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestConsumeTwitter.java diff --git a/nifi-assembly/LICENSE b/nifi-assembly/LICENSE index 7336f3425d..4ecd35df8a 100644 --- a/nifi-assembly/LICENSE +++ b/nifi-assembly/LICENSE @@ -3098,3 +3098,26 @@ which is available under an MIT license. LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +The binary distribution of this product bundles 'ScribeJava OAuth Library' +which is available under an MIT license. + + Copyright (c) 2010 Pablo Fernandez + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE index 19e5518f70..e4dc42efd5 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/src/main/resources/META-INF/NOTICE @@ -44,3 +44,12 @@ The following binary components are provided under the Apache Software License v The following NOTICE information applies: Hosebird Client (hbc) Copyright 2013 Twitter, Inc. + +************************ +The MIT License +************************ + + (MIT) ScribeJava OAuth Library + The following NOTICE information applies: + ScribeJava OAuth Library + Copyright (c) 2010 Pablo Fernandez diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml index 3fa6eff380..b6b39bdf1f 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml @@ -35,6 +35,17 @@ nifi-utils 1.17.0-SNAPSHOT + + com.twitter + twitter-api-java-sdk + 1.1.4 + + + org.apache.oltu.oauth2 + org.apache.oltu.oauth2.client + + + com.twitter hbc-core @@ -69,5 +80,14 @@ 1.17.0-SNAPSHOT test + + com.squareup.okhttp3 + okhttp + + + com.squareup.okhttp3 + mockwebserver + test + diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java new file mode 100644 index 0000000000..041fd80dba --- /dev/null +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.twitter; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import java.nio.charset.StandardCharsets; +import java.util.Set; +import java.util.List; +import java.util.HashSet; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +@PrimaryNodeOnly +@SupportsBatching +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"twitter", "tweets", "social media", "status", "json"}) +@CapabilityDescription("Streams tweets from Twitter's streaming API v2. The stream provides a sample stream or a search " + + "stream based on previously uploaded rules. This processor also provides a pass through for certain fields of the " + + "tweet to be returned as part of the response. See " + + "https://developer.twitter.com/en/docs/twitter-api/data-dictionary/introduction for more information regarding the " + + "Tweet object model.") +@WritesAttributes({ + @WritesAttribute(attribute = "mime.type", description = "The MIME Type set to application/json"), + @WritesAttribute(attribute = "tweets", description = "The number of Tweets in the FlowFile"), +}) +public class ConsumeTwitter extends AbstractProcessor { + + static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue(StreamEndpoint.SAMPLE_ENDPOINT.getEndpointName(), + "Sample Stream", + "Streams about one percent of all Tweets. " + + "https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream"); + static final AllowableValue ENDPOINT_SEARCH = new AllowableValue(StreamEndpoint.SEARCH_ENDPOINT.getEndpointName(), + "Search Stream", + "The search stream produces Tweets that match filtering rules configured on Twitter services. " + + "At least one well-formed filtering rule must be configured. " + + "https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream"); + + public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder() + .name("stream-endpoint") + .displayName("Stream Endpoint") + .description("The source from which the processor will consume Tweets.") + .required(true) + .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_SEARCH) + .defaultValue(ENDPOINT_SAMPLE.getValue()) + .build(); + public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder() + .name("base-path") + .displayName("Base Path") + .description("The base path that the processor will use for making HTTP requests. " + + "The default value should be sufficient for most use cases.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("https://api.twitter.com") + .build(); + public static final PropertyDescriptor BEARER_TOKEN = new PropertyDescriptor.Builder() + .name("bearer-token") + .displayName("Bearer Token") + .description("The Bearer Token provided by Twitter.") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor QUEUE_SIZE = new PropertyDescriptor.Builder() + .name("queue-size") + .displayName("Queue Size") + .description("Maximum size of internal queue for streamed messages") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("batch-size") + .displayName("Batch Size") + .description("The maximum size of the number of Tweets to be written to a single FlowFile. " + + "Will write fewer Tweets based on the number available in the queue at the time of processor invocation.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .build(); + public static final PropertyDescriptor BACKOFF_ATTEMPTS = new PropertyDescriptor.Builder() + .name("backoff-attempts") + .displayName("Backoff Attempts") + .description("The number of reconnection tries the processor will attempt in the event of " + + "a disconnection of the stream for any reason, before throwing an exception. To start a stream after " + + "this exception occur and the connection is fixed, please stop and restart the processor. If the value" + + "of this property is 0, then backoff will never occur and the processor will always need to be restarted" + + "if the stream fails.") + .required(true) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("5") + .build(); + public static final PropertyDescriptor BACKOFF_TIME = new PropertyDescriptor.Builder() + .name("backoff-time") + .displayName("Backoff Time") + .description("The duration to backoff before requesting a new stream if" + + "the current one fails for any reason. Will increase by factor of 2 every time a restart fails") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 mins") + .build(); + public static final PropertyDescriptor MAXIMUM_BACKOFF_TIME = new PropertyDescriptor.Builder() + .name("maximum-backoff-time") + .displayName("Maximum Backoff Time") + .description("The maximum duration to backoff to start attempting a new stream." + + "It is recommended that this number be much higher than the 'Backoff Time' property") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 mins") + .build(); + public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder() + .name("connect-timeout") + .displayName("Connect Timeout") + .description("The maximum time in which client should establish a connection with the " + + "Twitter API before a time out. Setting the value to 0 disables connection timeouts.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 secs") + .build(); + public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder() + .name("read-timeout") + .displayName("Read Timeout") + .description("The maximum time of inactivity between receiving tweets from Twitter through " + + "the API before a timeout. Setting the value to 0 disables read timeouts.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("10 secs") + .build(); + public static final PropertyDescriptor BACKFILL_MINUTES = new PropertyDescriptor.Builder() + .name("backfill-minutes") + .displayName("Backfill Minutes") + .description("The number of minutes (up to 5 minutes) of streaming data to be requested after a " + + "disconnect. Only available for project with academic research access. See " + + "https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/" + + "recovery-and-redundancy-features") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + public static final PropertyDescriptor TWEET_FIELDS = new PropertyDescriptor.Builder() + .name("tweet-fields") + .displayName("Tweet Fields") + .description("A comma-separated list of tweet fields to be returned as part of the tweet. Refer to " + + "https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/tweet " + + "for proper usage. Possible field values include: " + + "attachments, author_id, context_annotations, conversation_id, created_at, entities, geo, id, " + + "in_reply_to_user_id, lang, non_public_metrics, organic_metrics, possibly_sensitive, promoted_metrics, " + + "public_metrics, referenced_tweets, reply_settings, source, text, withheld") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor USER_FIELDS = new PropertyDescriptor.Builder() + .name("user-fields") + .displayName("User Fields") + .description("A comma-separated list of user fields to be returned as part of the tweet. Refer to " + + "https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/user " + + "for proper usage. Possible field values include: " + + "created_at, description, entities, id, location, name, pinned_tweet_id, profile_image_url, " + + "protected, public_metrics, url, username, verified, withheld") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor MEDIA_FIELDS = new PropertyDescriptor.Builder() + .name("media-fields") + .displayName("Media Fields") + .description("A comma-separated list of media fields to be returned as part of the tweet. Refer to " + + "https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/media " + + "for proper usage. Possible field values include: " + + "alt_text, duration_ms, height, media_key, non_public_metrics, organic_metrics, preview_image_url, " + + "promoted_metrics, public_metrics, type, url, width") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor POLL_FIELDS = new PropertyDescriptor.Builder() + .name("poll-fields") + .displayName("Poll Fields") + .description("A comma-separated list of poll fields to be returned as part of the tweet. Refer to " + + "https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/poll " + + "for proper usage. Possible field values include: " + + "duration_minutes, end_datetime, id, options, voting_status") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor PLACE_FIELDS = new PropertyDescriptor.Builder() + .name("place-fields") + .displayName("Place Fields") + .description("A comma-separated list of place fields to be returned as part of the tweet. Refer to " + + "https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/place " + + "for proper usage. Possible field values include: " + + "contained_within, country, country_code, full_name, geo, id, name, place_type") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor EXPANSIONS = new PropertyDescriptor.Builder() + .name("expansions") + .displayName("Expansions") + .description("A comma-separated list of expansions for objects in the returned tweet. See " + + "https://developer.twitter.com/en/docs/twitter-api/expansions " + + "for proper usage. Possible field values include: " + + "author_id, referenced_tweets.id, referenced_tweets.id.author_id, entities.mentions.username, " + + "attachments.poll_ids, attachments.media_keys ,in_reply_to_user_id, geo.place_id") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles containing an array of one or more Tweets") + .build(); + + private List descriptors; + private Set relationships; + + private TweetStreamService tweetStreamService; + + private volatile BlockingQueue messageQueue; + + @Override + protected void init(ProcessorInitializationContext context) { + final List descriptors = new ArrayList<>(); + descriptors.add(ENDPOINT); + descriptors.add(BASE_PATH); + descriptors.add(BEARER_TOKEN); + descriptors.add(QUEUE_SIZE); + descriptors.add(BATCH_SIZE); + descriptors.add(BACKOFF_ATTEMPTS); + descriptors.add(BACKOFF_TIME); + descriptors.add(MAXIMUM_BACKOFF_TIME); + descriptors.add(CONNECT_TIMEOUT); + descriptors.add(READ_TIMEOUT); + descriptors.add(BACKFILL_MINUTES); + descriptors.add(TWEET_FIELDS); + descriptors.add(USER_FIELDS); + descriptors.add(MEDIA_FIELDS); + descriptors.add(POLL_FIELDS); + descriptors.add(PLACE_FIELDS); + descriptors.add(EXPANSIONS); + + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return this.descriptors; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + messageQueue = new LinkedBlockingQueue<>(context.getProperty(QUEUE_SIZE).asInteger()); + + tweetStreamService = new TweetStreamService(context, messageQueue, getLogger()); + tweetStreamService.start(); + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + final String firstTweet = messageQueue.poll(); + if (firstTweet == null) { + context.yield(); + return; + } + + final AtomicInteger tweetCount = new AtomicInteger(1); + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, out -> { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + String tweet = firstTweet; + out.write('['); + out.write(tweet.getBytes(StandardCharsets.UTF_8)); + while (tweetCount.get() < batchSize && (tweet = messageQueue.poll()) != null) { + out.write(','); + out.write(tweet.getBytes(StandardCharsets.UTF_8)); + tweetCount.getAndIncrement(); + } + out.write(']'); + }); + + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + attributes.put(CoreAttributes.FILENAME.key(), String.format("%s.json", UUID.randomUUID())); + attributes.put("tweets", Integer.toString(tweetCount.get())); + flowFile = session.putAllAttributes(flowFile, attributes); + + session.transfer(flowFile, REL_SUCCESS); + + final String endpointName = context.getProperty(ENDPOINT).getValue(); + final String transitUri = tweetStreamService.getTransitUri(endpointName); + + session.getProvenanceReporter().receive(flowFile, transitUri); + } + + @OnStopped + public void onStopped() { + if (tweetStreamService != null) { + tweetStreamService.stop(); + } + tweetStreamService = null; + emptyQueue(); + } + + private void emptyQueue() { + while (!messageQueue.isEmpty()) { + messageQueue.poll(); + } + } +} diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/StreamEndpoint.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/StreamEndpoint.java new file mode 100644 index 0000000000..96b0df1d19 --- /dev/null +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/StreamEndpoint.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.twitter; + +public enum StreamEndpoint { + SAMPLE_ENDPOINT("Sample Endpoint", "/2/tweets/sample/stream"), + SEARCH_ENDPOINT("Search Endpoint", "/2/tweets/search/stream"); + + private String endpointName; + private String path; + + StreamEndpoint(final String endpointName, final String path) { + this.endpointName = endpointName; + this.path = path; + } + + public String getEndpointName() { + return this.endpointName; + } + + public String getPath() { + return this.path; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java new file mode 100644 index 0000000000..f304614e71 --- /dev/null +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.twitter; + +import com.twitter.clientlib.ApiClient; +import com.twitter.clientlib.ApiException; +import com.twitter.clientlib.TwitterCredentialsBearer; +import com.twitter.clientlib.api.TwitterApi; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +public class TweetStreamService { + private final BlockingQueue queue; + private final ComponentLog logger; + + private final ScheduledExecutorService executorService; + + private final Set tweetFields; + private final Set userFields; + private final Set mediaFields; + private final Set pollFields; + private final Set placeFields; + private final Set expansions; + private final int backfillMinutes; + private final TwitterApi api; + private InputStream stream; + + private final int backoffAttempts; + private final long backoffTime; + private final long maximumBackoff; + + private long backoffMultiplier; + private int attemptCounter; + + private final StreamEndpoint endpoint; + + public TweetStreamService(final ProcessContext context, final BlockingQueue queue, final ComponentLog logger) { + Objects.requireNonNull(context); + Objects.requireNonNull(queue); + Objects.requireNonNull(logger); + + this.queue = queue; + this.logger = logger; + + final String endpointName = context.getProperty(ConsumeTwitter.ENDPOINT).getValue(); + if (ConsumeTwitter.ENDPOINT_SAMPLE.getValue().equals(endpointName)) { + this.endpoint = StreamEndpoint.SAMPLE_ENDPOINT; + } else { + this.endpoint = StreamEndpoint.SEARCH_ENDPOINT; + } + + this.tweetFields = parseCommaSeparatedProperties(context, ConsumeTwitter.TWEET_FIELDS); + this.userFields = parseCommaSeparatedProperties(context, ConsumeTwitter.USER_FIELDS); + this.mediaFields = parseCommaSeparatedProperties(context, ConsumeTwitter.MEDIA_FIELDS); + this.pollFields = parseCommaSeparatedProperties(context, ConsumeTwitter.POLL_FIELDS); + this.placeFields = parseCommaSeparatedProperties(context, ConsumeTwitter.PLACE_FIELDS); + this.expansions = parseCommaSeparatedProperties(context, ConsumeTwitter.EXPANSIONS); + this.backfillMinutes = context.getProperty(ConsumeTwitter.BACKFILL_MINUTES).asInteger(); + + this.backoffMultiplier = 1L; + this.backoffAttempts = context.getProperty(ConsumeTwitter.BACKOFF_ATTEMPTS).asInteger(); + this.attemptCounter = 0; + this.backoffTime = context.getProperty(ConsumeTwitter.BACKOFF_TIME).asTimePeriod(TimeUnit.SECONDS); + this.maximumBackoff = context.getProperty(ConsumeTwitter.MAXIMUM_BACKOFF_TIME).asTimePeriod(TimeUnit.SECONDS); + + ApiClient client = new ApiClient(); + final int connectTimeout = context.getProperty(ConsumeTwitter.CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + final int readTimeout = context.getProperty(ConsumeTwitter.READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + client = client.setConnectTimeout(connectTimeout); + client = client.setReadTimeout(readTimeout); + api = new TwitterApi(client); + + final TwitterCredentialsBearer bearer = new TwitterCredentialsBearer(context.getProperty(ConsumeTwitter.BEARER_TOKEN).getValue()); + api.setTwitterCredentials(bearer); + + final String basePath = context.getProperty(ConsumeTwitter.BASE_PATH).getValue(); + api.getApiClient().setBasePath(basePath); + + final ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build(); + this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + public String getTransitUri(final String endpoint) { + if (endpoint.equals(StreamEndpoint.SAMPLE_ENDPOINT.getEndpointName())) { + return api.getApiClient().getBasePath() + StreamEndpoint.SAMPLE_ENDPOINT.getPath(); + } else if (endpoint.equals(StreamEndpoint.SEARCH_ENDPOINT.getEndpointName())) { + return api.getApiClient().getBasePath() + StreamEndpoint.SEARCH_ENDPOINT.getPath(); + } else { + logger.warn("Unrecognized endpoint in getTransitUri. Returning basePath"); + return api.getApiClient().getBasePath(); + } + } + + /** + * This method would be called when we would like the stream to get started. This method will spin off a thread that + * will continue to queue tweets on to the given queue passed in the constructor. The thread will continue + * to run until {@code stop} is called. + */ + public void start() { + executorService.execute(new TweetStreamStarter()); + } + + /** + * This method would be called when we would like the stream to get stopped. The stream will be closed and the + * executorService will be shut down. + */ + public void stop() { + if (stream != null) { + try { + stream.close(); + } catch (IOException e) { + logger.error("Closing response stream failed", e); + } + } + + executorService.shutdownNow(); + } + + private Long calculateBackoffDelay() { + long backoff = backoffMultiplier * backoffTime; + return Math.min(backoff, maximumBackoff); + } + + private void scheduleStartStreamWithBackoff() { + // use exponential(by factor of 2) backoff in scheduling the next TweetStreamStarter + if (attemptCounter >= backoffAttempts) { + throw new ProcessException(String.format("Connection failed after maximum attempts [%d]", attemptCounter)); + } + attemptCounter += 1; + long delay = calculateBackoffDelay(); + backoffMultiplier *= 2; + logger.info("Scheduling new stream connection after delay [{} s]", delay); + executorService.schedule(new TweetStreamStarter(), delay, TimeUnit.SECONDS); + } + + private void resetBackoff() { + attemptCounter = 0; + backoffMultiplier = 1L; + } + + private class TweetStreamStarter implements Runnable { + @Override + public void run() { + try { + if (endpoint.equals(StreamEndpoint.SAMPLE_ENDPOINT)) { + stream = api.tweets().sampleStream(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + } else { + stream = api.tweets().searchStream(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes); + } + executorService.execute(new TweetStreamHandler()); + } catch (final ApiException e) { + stream = null; + logger.warn("Twitter Stream [{}] API connection failed: HTTP {}", endpoint.getEndpointName(), e.getCode(), e); + scheduleStartStreamWithBackoff(); + } catch (final Exception e) { + stream = null; + logger.warn("Twitter Stream [{}] connection failed", endpoint.getEndpointName(), e); + scheduleStartStreamWithBackoff(); + } + } + } + + private class TweetStreamHandler implements Runnable { + @Override + public void run() { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) { + String tweetRecord = reader.readLine(); + while (tweetRecord != null) { + // Skip empty lines received from the Twitter Stream + if (tweetRecord.isEmpty()) { + tweetRecord = reader.readLine(); + continue; + } + + queue.put(tweetRecord); + + // reset backoff multiplier upon successful receipt of a tweet + resetBackoff(); + + tweetRecord = reader.readLine(); + } + } catch (final IOException e) { + logger.info("Stream is closed or has stopped", e); + } catch (final InterruptedException e) { + logger.info("Interrupted while adding Tweet to queue", e); + return; + } + logger.info("Stream processing completed"); + scheduleStartStreamWithBackoff(); + } + } + + private Set parseCommaSeparatedProperties(final ProcessContext context, final PropertyDescriptor property) { + Set fields = null; + if (context.getProperty(property).isSet()) { + fields = new HashSet<>(); + final String fieldsString = context.getProperty(property).getValue(); + for (final String field: fieldsString.split(",")) { + fields.add(field.trim()); + } + } + return fields; + } +} diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9504a1128e..1f90feb7e5 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.twitter.GetTwitter \ No newline at end of file +org.apache.nifi.processors.twitter.GetTwitter +org.apache.nifi.processors.twitter.ConsumeTwitter diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestConsumeTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestConsumeTwitter.java new file mode 100644 index 0000000000..56d2a2c14a --- /dev/null +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/test/java/org/apache/nifi/processors/twitter/TestConsumeTwitter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.twitter; + +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +public class TestConsumeTwitter { + private MockWebServer mockWebServer; + + private TestRunner runner; + + private final String SAMPLE_TWEET = "{\"data\":{\"id\":\"123\",\"text\":\"This is a sample tweet and is not real!\"}}"; + private final String EXPECTED_TWEET = "[{\"data\":{\"id\":\"123\",\"text\":\"This is a sample tweet and is not real!\"}}]"; + + @BeforeEach + public void setRunnerAndAPI() { + mockWebServer = new MockWebServer(); + + runner = TestRunners.newTestRunner(ConsumeTwitter.class); + + runner.setProperty(ConsumeTwitter.BEARER_TOKEN, "BEARER_TOKEN"); + final String basePath = mockWebServer.url("").toString(); + runner.setProperty(ConsumeTwitter.BASE_PATH, basePath); + } + + @AfterEach + public void shutdownServerAndAPI() throws IOException { + mockWebServer.shutdown(); + } + + @Test + public void testReceiveSingleTweetInStream() throws InterruptedException { + MockResponse response = new MockResponse() + .setResponseCode(200) + .setBody(SAMPLE_TWEET) + .addHeader("Content-Type", "application/json"); + mockWebServer.enqueue(response); + + + runner.setProperty(ConsumeTwitter.ENDPOINT, ConsumeTwitter.ENDPOINT_SAMPLE); + runner.setProperty(ConsumeTwitter.QUEUE_SIZE, "10000"); + runner.setProperty(ConsumeTwitter.BATCH_SIZE, "10"); + runner.setProperty(ConsumeTwitter.BACKFILL_MINUTES, "0"); + + runner.assertValid(); + + // the TwitterStreamAPI class spins up another thread and might not be done queueing tweets in one run of the processor + final int maxTries = 3; + final long runSchedule = 250; + runner.setRunSchedule(runSchedule); + runner.run(maxTries, false, true); + runner.stop(); + + // there should only be a single FlowFile containing a tweet + runner.assertTransferCount(ConsumeTwitter.REL_SUCCESS, 1); + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConsumeTwitter.REL_SUCCESS).get(0); + flowFile.assertContentEquals(EXPECTED_TWEET); + flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); + flowFile.assertAttributeEquals("tweets", "1"); + } +}