NIFI-9755 Added ConsumeTwitter Processor

This closes #5947

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Emilio Setiadarma 2022-03-03 14:40:42 -08:00 committed by exceptionfactory
parent 3699d42116
commit e20aa0ea2a
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
8 changed files with 767 additions and 1 deletions

View File

@ -3098,3 +3098,26 @@ which is available under an MIT license.
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE. IN THE SOFTWARE.
The binary distribution of this product bundles 'ScribeJava OAuth Library'
which is available under an MIT license.
Copyright (c) 2010 Pablo Fernandez
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -44,3 +44,12 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies: The following NOTICE information applies:
Hosebird Client (hbc) Hosebird Client (hbc)
Copyright 2013 Twitter, Inc. Copyright 2013 Twitter, Inc.
************************
The MIT License
************************
(MIT) ScribeJava OAuth Library
The following NOTICE information applies:
ScribeJava OAuth Library
Copyright (c) 2010 Pablo Fernandez

View File

@ -35,6 +35,17 @@
<artifactId>nifi-utils</artifactId> <artifactId>nifi-utils</artifactId>
<version>1.17.0-SNAPSHOT</version> <version>1.17.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>twitter-api-java-sdk</artifactId>
<version>1.1.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.oltu.oauth2</groupId>
<artifactId>org.apache.oltu.oauth2.client</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>com.twitter</groupId> <groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId> <artifactId>hbc-core</artifactId>
@ -69,5 +80,14 @@
<version>1.17.0-SNAPSHOT</version> <version>1.17.0-SNAPSHOT</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -0,0 +1,355 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.twitter;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.List;
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
@PrimaryNodeOnly
@SupportsBatching
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@Tags({"twitter", "tweets", "social media", "status", "json"})
@CapabilityDescription("Streams tweets from Twitter's streaming API v2. The stream provides a sample stream or a search "
+ "stream based on previously uploaded rules. This processor also provides a pass through for certain fields of the "
+ "tweet to be returned as part of the response. See "
+ "https://developer.twitter.com/en/docs/twitter-api/data-dictionary/introduction for more information regarding the "
+ "Tweet object model.")
@WritesAttributes({
@WritesAttribute(attribute = "mime.type", description = "The MIME Type set to application/json"),
@WritesAttribute(attribute = "tweets", description = "The number of Tweets in the FlowFile"),
})
public class ConsumeTwitter extends AbstractProcessor {
static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue(StreamEndpoint.SAMPLE_ENDPOINT.getEndpointName(),
"Sample Stream",
"Streams about one percent of all Tweets. " +
"https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/api-reference/get-tweets-sample-stream");
static final AllowableValue ENDPOINT_SEARCH = new AllowableValue(StreamEndpoint.SEARCH_ENDPOINT.getEndpointName(),
"Search Stream",
"The search stream produces Tweets that match filtering rules configured on Twitter services. " +
"At least one well-formed filtering rule must be configured. " +
"https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream");
public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder()
.name("stream-endpoint")
.displayName("Stream Endpoint")
.description("The source from which the processor will consume Tweets.")
.required(true)
.allowableValues(ENDPOINT_SAMPLE, ENDPOINT_SEARCH)
.defaultValue(ENDPOINT_SAMPLE.getValue())
.build();
public static final PropertyDescriptor BASE_PATH = new PropertyDescriptor.Builder()
.name("base-path")
.displayName("Base Path")
.description("The base path that the processor will use for making HTTP requests. " +
"The default value should be sufficient for most use cases.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("https://api.twitter.com")
.build();
public static final PropertyDescriptor BEARER_TOKEN = new PropertyDescriptor.Builder()
.name("bearer-token")
.displayName("Bearer Token")
.description("The Bearer Token provided by Twitter.")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor QUEUE_SIZE = new PropertyDescriptor.Builder()
.name("queue-size")
.displayName("Queue Size")
.description("Maximum size of internal queue for streamed messages")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("batch-size")
.displayName("Batch Size")
.description("The maximum size of the number of Tweets to be written to a single FlowFile. " +
"Will write fewer Tweets based on the number available in the queue at the time of processor invocation.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.build();
public static final PropertyDescriptor BACKOFF_ATTEMPTS = new PropertyDescriptor.Builder()
.name("backoff-attempts")
.displayName("Backoff Attempts")
.description("The number of reconnection tries the processor will attempt in the event of " +
"a disconnection of the stream for any reason, before throwing an exception. To start a stream after " +
"this exception occur and the connection is fixed, please stop and restart the processor. If the value" +
"of this property is 0, then backoff will never occur and the processor will always need to be restarted" +
"if the stream fails.")
.required(true)
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("5")
.build();
public static final PropertyDescriptor BACKOFF_TIME = new PropertyDescriptor.Builder()
.name("backoff-time")
.displayName("Backoff Time")
.description("The duration to backoff before requesting a new stream if" +
"the current one fails for any reason. Will increase by factor of 2 every time a restart fails")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 mins")
.build();
public static final PropertyDescriptor MAXIMUM_BACKOFF_TIME = new PropertyDescriptor.Builder()
.name("maximum-backoff-time")
.displayName("Maximum Backoff Time")
.description("The maximum duration to backoff to start attempting a new stream." +
"It is recommended that this number be much higher than the 'Backoff Time' property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("5 mins")
.build();
public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
.name("connect-timeout")
.displayName("Connect Timeout")
.description("The maximum time in which client should establish a connection with the " +
"Twitter API before a time out. Setting the value to 0 disables connection timeouts.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 secs")
.build();
public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder()
.name("read-timeout")
.displayName("Read Timeout")
.description("The maximum time of inactivity between receiving tweets from Twitter through " +
"the API before a timeout. Setting the value to 0 disables read timeouts.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("10 secs")
.build();
public static final PropertyDescriptor BACKFILL_MINUTES = new PropertyDescriptor.Builder()
.name("backfill-minutes")
.displayName("Backfill Minutes")
.description("The number of minutes (up to 5 minutes) of streaming data to be requested after a " +
"disconnect. Only available for project with academic research access. See " +
"https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/integrate/" +
"recovery-and-redundancy-features")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor TWEET_FIELDS = new PropertyDescriptor.Builder()
.name("tweet-fields")
.displayName("Tweet Fields")
.description("A comma-separated list of tweet fields to be returned as part of the tweet. Refer to " +
"https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/tweet " +
"for proper usage. Possible field values include: " +
"attachments, author_id, context_annotations, conversation_id, created_at, entities, geo, id, " +
"in_reply_to_user_id, lang, non_public_metrics, organic_metrics, possibly_sensitive, promoted_metrics, " +
"public_metrics, referenced_tweets, reply_settings, source, text, withheld")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor USER_FIELDS = new PropertyDescriptor.Builder()
.name("user-fields")
.displayName("User Fields")
.description("A comma-separated list of user fields to be returned as part of the tweet. Refer to " +
"https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/user " +
"for proper usage. Possible field values include: " +
"created_at, description, entities, id, location, name, pinned_tweet_id, profile_image_url, " +
"protected, public_metrics, url, username, verified, withheld")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor MEDIA_FIELDS = new PropertyDescriptor.Builder()
.name("media-fields")
.displayName("Media Fields")
.description("A comma-separated list of media fields to be returned as part of the tweet. Refer to " +
"https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/media " +
"for proper usage. Possible field values include: " +
"alt_text, duration_ms, height, media_key, non_public_metrics, organic_metrics, preview_image_url, " +
"promoted_metrics, public_metrics, type, url, width")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor POLL_FIELDS = new PropertyDescriptor.Builder()
.name("poll-fields")
.displayName("Poll Fields")
.description("A comma-separated list of poll fields to be returned as part of the tweet. Refer to " +
"https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/poll " +
"for proper usage. Possible field values include: " +
"duration_minutes, end_datetime, id, options, voting_status")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PLACE_FIELDS = new PropertyDescriptor.Builder()
.name("place-fields")
.displayName("Place Fields")
.description("A comma-separated list of place fields to be returned as part of the tweet. Refer to " +
"https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/place " +
"for proper usage. Possible field values include: " +
"contained_within, country, country_code, full_name, geo, id, name, place_type")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor EXPANSIONS = new PropertyDescriptor.Builder()
.name("expansions")
.displayName("Expansions")
.description("A comma-separated list of expansions for objects in the returned tweet. See " +
"https://developer.twitter.com/en/docs/twitter-api/expansions " +
"for proper usage. Possible field values include: " +
"author_id, referenced_tweets.id, referenced_tweets.id.author_id, entities.mentions.username, " +
"attachments.poll_ids, attachments.media_keys ,in_reply_to_user_id, geo.place_id")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles containing an array of one or more Tweets")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private TweetStreamService tweetStreamService;
private volatile BlockingQueue<String> messageQueue;
@Override
protected void init(ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ENDPOINT);
descriptors.add(BASE_PATH);
descriptors.add(BEARER_TOKEN);
descriptors.add(QUEUE_SIZE);
descriptors.add(BATCH_SIZE);
descriptors.add(BACKOFF_ATTEMPTS);
descriptors.add(BACKOFF_TIME);
descriptors.add(MAXIMUM_BACKOFF_TIME);
descriptors.add(CONNECT_TIMEOUT);
descriptors.add(READ_TIMEOUT);
descriptors.add(BACKFILL_MINUTES);
descriptors.add(TWEET_FIELDS);
descriptors.add(USER_FIELDS);
descriptors.add(MEDIA_FIELDS);
descriptors.add(POLL_FIELDS);
descriptors.add(PLACE_FIELDS);
descriptors.add(EXPANSIONS);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return this.descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
messageQueue = new LinkedBlockingQueue<>(context.getProperty(QUEUE_SIZE).asInteger());
tweetStreamService = new TweetStreamService(context, messageQueue, getLogger());
tweetStreamService.start();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final String firstTweet = messageQueue.poll();
if (firstTweet == null) {
context.yield();
return;
}
final AtomicInteger tweetCount = new AtomicInteger(1);
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, out -> {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
String tweet = firstTweet;
out.write('[');
out.write(tweet.getBytes(StandardCharsets.UTF_8));
while (tweetCount.get() < batchSize && (tweet = messageQueue.poll()) != null) {
out.write(',');
out.write(tweet.getBytes(StandardCharsets.UTF_8));
tweetCount.getAndIncrement();
}
out.write(']');
});
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
attributes.put(CoreAttributes.FILENAME.key(), String.format("%s.json", UUID.randomUUID()));
attributes.put("tweets", Integer.toString(tweetCount.get()));
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
final String endpointName = context.getProperty(ENDPOINT).getValue();
final String transitUri = tweetStreamService.getTransitUri(endpointName);
session.getProvenanceReporter().receive(flowFile, transitUri);
}
@OnStopped
public void onStopped() {
if (tweetStreamService != null) {
tweetStreamService.stop();
}
tweetStreamService = null;
emptyQueue();
}
private void emptyQueue() {
while (!messageQueue.isEmpty()) {
messageQueue.poll();
}
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.twitter;
public enum StreamEndpoint {
SAMPLE_ENDPOINT("Sample Endpoint", "/2/tweets/sample/stream"),
SEARCH_ENDPOINT("Search Endpoint", "/2/tweets/search/stream");
private String endpointName;
private String path;
StreamEndpoint(final String endpointName, final String path) {
this.endpointName = endpointName;
this.path = path;
}
public String getEndpointName() {
return this.endpointName;
}
public String getPath() {
return this.path;
}
}

View File

@ -0,0 +1,235 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.twitter;
import com.twitter.clientlib.ApiClient;
import com.twitter.clientlib.ApiException;
import com.twitter.clientlib.TwitterCredentialsBearer;
import com.twitter.clientlib.api.TwitterApi;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
public class TweetStreamService {
private final BlockingQueue<String> queue;
private final ComponentLog logger;
private final ScheduledExecutorService executorService;
private final Set<String> tweetFields;
private final Set<String> userFields;
private final Set<String> mediaFields;
private final Set<String> pollFields;
private final Set<String> placeFields;
private final Set<String> expansions;
private final int backfillMinutes;
private final TwitterApi api;
private InputStream stream;
private final int backoffAttempts;
private final long backoffTime;
private final long maximumBackoff;
private long backoffMultiplier;
private int attemptCounter;
private final StreamEndpoint endpoint;
public TweetStreamService(final ProcessContext context, final BlockingQueue<String> queue, final ComponentLog logger) {
Objects.requireNonNull(context);
Objects.requireNonNull(queue);
Objects.requireNonNull(logger);
this.queue = queue;
this.logger = logger;
final String endpointName = context.getProperty(ConsumeTwitter.ENDPOINT).getValue();
if (ConsumeTwitter.ENDPOINT_SAMPLE.getValue().equals(endpointName)) {
this.endpoint = StreamEndpoint.SAMPLE_ENDPOINT;
} else {
this.endpoint = StreamEndpoint.SEARCH_ENDPOINT;
}
this.tweetFields = parseCommaSeparatedProperties(context, ConsumeTwitter.TWEET_FIELDS);
this.userFields = parseCommaSeparatedProperties(context, ConsumeTwitter.USER_FIELDS);
this.mediaFields = parseCommaSeparatedProperties(context, ConsumeTwitter.MEDIA_FIELDS);
this.pollFields = parseCommaSeparatedProperties(context, ConsumeTwitter.POLL_FIELDS);
this.placeFields = parseCommaSeparatedProperties(context, ConsumeTwitter.PLACE_FIELDS);
this.expansions = parseCommaSeparatedProperties(context, ConsumeTwitter.EXPANSIONS);
this.backfillMinutes = context.getProperty(ConsumeTwitter.BACKFILL_MINUTES).asInteger();
this.backoffMultiplier = 1L;
this.backoffAttempts = context.getProperty(ConsumeTwitter.BACKOFF_ATTEMPTS).asInteger();
this.attemptCounter = 0;
this.backoffTime = context.getProperty(ConsumeTwitter.BACKOFF_TIME).asTimePeriod(TimeUnit.SECONDS);
this.maximumBackoff = context.getProperty(ConsumeTwitter.MAXIMUM_BACKOFF_TIME).asTimePeriod(TimeUnit.SECONDS);
ApiClient client = new ApiClient();
final int connectTimeout = context.getProperty(ConsumeTwitter.CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int readTimeout = context.getProperty(ConsumeTwitter.READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
client = client.setConnectTimeout(connectTimeout);
client = client.setReadTimeout(readTimeout);
api = new TwitterApi(client);
final TwitterCredentialsBearer bearer = new TwitterCredentialsBearer(context.getProperty(ConsumeTwitter.BEARER_TOKEN).getValue());
api.setTwitterCredentials(bearer);
final String basePath = context.getProperty(ConsumeTwitter.BASE_PATH).getValue();
api.getApiClient().setBasePath(basePath);
final ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build();
this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
}
public String getTransitUri(final String endpoint) {
if (endpoint.equals(StreamEndpoint.SAMPLE_ENDPOINT.getEndpointName())) {
return api.getApiClient().getBasePath() + StreamEndpoint.SAMPLE_ENDPOINT.getPath();
} else if (endpoint.equals(StreamEndpoint.SEARCH_ENDPOINT.getEndpointName())) {
return api.getApiClient().getBasePath() + StreamEndpoint.SEARCH_ENDPOINT.getPath();
} else {
logger.warn("Unrecognized endpoint in getTransitUri. Returning basePath");
return api.getApiClient().getBasePath();
}
}
/**
* This method would be called when we would like the stream to get started. This method will spin off a thread that
* will continue to queue tweets on to the given queue passed in the constructor. The thread will continue
* to run until {@code stop} is called.
*/
public void start() {
executorService.execute(new TweetStreamStarter());
}
/**
* This method would be called when we would like the stream to get stopped. The stream will be closed and the
* executorService will be shut down.
*/
public void stop() {
if (stream != null) {
try {
stream.close();
} catch (IOException e) {
logger.error("Closing response stream failed", e);
}
}
executorService.shutdownNow();
}
private Long calculateBackoffDelay() {
long backoff = backoffMultiplier * backoffTime;
return Math.min(backoff, maximumBackoff);
}
private void scheduleStartStreamWithBackoff() {
// use exponential(by factor of 2) backoff in scheduling the next TweetStreamStarter
if (attemptCounter >= backoffAttempts) {
throw new ProcessException(String.format("Connection failed after maximum attempts [%d]", attemptCounter));
}
attemptCounter += 1;
long delay = calculateBackoffDelay();
backoffMultiplier *= 2;
logger.info("Scheduling new stream connection after delay [{} s]", delay);
executorService.schedule(new TweetStreamStarter(), delay, TimeUnit.SECONDS);
}
private void resetBackoff() {
attemptCounter = 0;
backoffMultiplier = 1L;
}
private class TweetStreamStarter implements Runnable {
@Override
public void run() {
try {
if (endpoint.equals(StreamEndpoint.SAMPLE_ENDPOINT)) {
stream = api.tweets().sampleStream(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes);
} else {
stream = api.tweets().searchStream(expansions, tweetFields, userFields, mediaFields, placeFields, pollFields, backfillMinutes);
}
executorService.execute(new TweetStreamHandler());
} catch (final ApiException e) {
stream = null;
logger.warn("Twitter Stream [{}] API connection failed: HTTP {}", endpoint.getEndpointName(), e.getCode(), e);
scheduleStartStreamWithBackoff();
} catch (final Exception e) {
stream = null;
logger.warn("Twitter Stream [{}] connection failed", endpoint.getEndpointName(), e);
scheduleStartStreamWithBackoff();
}
}
}
private class TweetStreamHandler implements Runnable {
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8))) {
String tweetRecord = reader.readLine();
while (tweetRecord != null) {
// Skip empty lines received from the Twitter Stream
if (tweetRecord.isEmpty()) {
tweetRecord = reader.readLine();
continue;
}
queue.put(tweetRecord);
// reset backoff multiplier upon successful receipt of a tweet
resetBackoff();
tweetRecord = reader.readLine();
}
} catch (final IOException e) {
logger.info("Stream is closed or has stopped", e);
} catch (final InterruptedException e) {
logger.info("Interrupted while adding Tweet to queue", e);
return;
}
logger.info("Stream processing completed");
scheduleStartStreamWithBackoff();
}
}
private Set<String> parseCommaSeparatedProperties(final ProcessContext context, final PropertyDescriptor property) {
Set<String> fields = null;
if (context.getProperty(property).isSet()) {
fields = new HashSet<>();
final String fieldsString = context.getProperty(property).getValue();
for (final String field: fieldsString.split(",")) {
fields.add(field.trim());
}
}
return fields;
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.twitter.GetTwitter org.apache.nifi.processors.twitter.GetTwitter
org.apache.nifi.processors.twitter.ConsumeTwitter

View File

@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.twitter;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
public class TestConsumeTwitter {
private MockWebServer mockWebServer;
private TestRunner runner;
private final String SAMPLE_TWEET = "{\"data\":{\"id\":\"123\",\"text\":\"This is a sample tweet and is not real!\"}}";
private final String EXPECTED_TWEET = "[{\"data\":{\"id\":\"123\",\"text\":\"This is a sample tweet and is not real!\"}}]";
@BeforeEach
public void setRunnerAndAPI() {
mockWebServer = new MockWebServer();
runner = TestRunners.newTestRunner(ConsumeTwitter.class);
runner.setProperty(ConsumeTwitter.BEARER_TOKEN, "BEARER_TOKEN");
final String basePath = mockWebServer.url("").toString();
runner.setProperty(ConsumeTwitter.BASE_PATH, basePath);
}
@AfterEach
public void shutdownServerAndAPI() throws IOException {
mockWebServer.shutdown();
}
@Test
public void testReceiveSingleTweetInStream() throws InterruptedException {
MockResponse response = new MockResponse()
.setResponseCode(200)
.setBody(SAMPLE_TWEET)
.addHeader("Content-Type", "application/json");
mockWebServer.enqueue(response);
runner.setProperty(ConsumeTwitter.ENDPOINT, ConsumeTwitter.ENDPOINT_SAMPLE);
runner.setProperty(ConsumeTwitter.QUEUE_SIZE, "10000");
runner.setProperty(ConsumeTwitter.BATCH_SIZE, "10");
runner.setProperty(ConsumeTwitter.BACKFILL_MINUTES, "0");
runner.assertValid();
// the TwitterStreamAPI class spins up another thread and might not be done queueing tweets in one run of the processor
final int maxTries = 3;
final long runSchedule = 250;
runner.setRunSchedule(runSchedule);
runner.run(maxTries, false, true);
runner.stop();
// there should only be a single FlowFile containing a tweet
runner.assertTransferCount(ConsumeTwitter.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConsumeTwitter.REL_SUCCESS).get(0);
flowFile.assertContentEquals(EXPECTED_TWEET);
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
flowFile.assertAttributeEquals("tweets", "1");
}
}