diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml
index 45af0cec5f..4768dbc2f1 100644
--- a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml
@@ -35,11 +35,11 @@
nifi-processor-utils
-
- com.twitter
- hbc-twitter4j
- 2.2.0
-
+
+ com.twitter
+ hbc-twitter4j
+ 2.2.0
+
org.apache.nifi
diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
index 45b1ae1a13..a0568676c2 100644
--- a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
+++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java
@@ -69,82 +69,84 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
@SupportsBatching
@Tags({"twitter", "tweets", "social media", "status", "json"})
@CapabilityDescription("Pulls status changes from Twitter's streaming API")
-@WritesAttribute(attribute="mime.type", description="Sets mime type to application/json")
+@WritesAttribute(attribute = "mime.type", description = "Sets mime type to application/json")
public class GetTwitter extends AbstractProcessor {
- static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'");
- static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets");
- static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs");
-
- public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder()
- .name("Twitter Endpoint")
- .description("Specifies which endpoint data should be pulled from")
- .required(true)
- .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER)
- .defaultValue(ENDPOINT_SAMPLE.getValue())
- .build();
+ static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'");
+ static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets");
+ static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs");
+
+ public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder()
+ .name("Twitter Endpoint")
+ .description("Specifies which endpoint data should be pulled from")
+ .required(true)
+ .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER)
+ .defaultValue(ENDPOINT_SAMPLE.getValue())
+ .build();
public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder()
- .name("Consumer Key")
- .description("The Consumer Key provided by Twitter")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Consumer Key")
+ .description("The Consumer Key provided by Twitter")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor CONSUMER_SECRET = new PropertyDescriptor.Builder()
- .name("Consumer Secret")
- .description("The Consumer Secret provided by Twitter")
- .required(true)
- .sensitive(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Consumer Secret")
+ .description("The Consumer Secret provided by Twitter")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
- .name("Access Token")
- .description("The Acces Token provided by Twitter")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Access Token")
+ .description("The Acces Token provided by Twitter")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new PropertyDescriptor.Builder()
- .name("Access Token Secret")
- .description("The Access Token Secret provided by Twitter")
- .required(true)
- .sensitive(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Access Token Secret")
+ .description("The Access Token Secret provided by Twitter")
+ .required(true)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor LANGUAGES = new PropertyDescriptor.Builder()
- .name("Languages")
- .description("A comma-separated list of languages for which tweets should be fetched")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ .name("Languages")
+ .description("A comma-separated list of languages for which tweets should be fetched")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
public static final PropertyDescriptor FOLLOWING = new PropertyDescriptor.Builder()
- .name("IDs to Follow")
- .description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.")
- .required(false)
- .addValidator(new FollowingValidator())
- .build();
+ .name("IDs to Follow")
+ .description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.")
+ .required(false)
+ .addValidator(new FollowingValidator())
+ .build();
public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder()
- .name("Terms to Filter On")
- .description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'. The filter works such that if any term matches, the status update will be retrieved; multiple terms separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that have either 'hello' or both 'it' AND 'was'")
- .required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
-
+ .name("Terms to Filter On")
+ .description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'."
+ + " The filter works such that if any term matches, the status update will be retrieved; multiple terms"
+ + " separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that"
+ + " have either 'hello' or both 'it' AND 'was'")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("All status updates will be routed to this relationship")
- .build();
+ .name("success")
+ .description("All status updates will be routed to this relationship")
+ .build();
private List descriptors;
private Set relationships;
- private final BlockingQueue eventQueue = new LinkedBlockingQueue(1000);
-
+ private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(1000);
+
private volatile Client client;
private volatile BlockingQueue messageQueue;
@Override
protected void init(final ProcessorInitializationContext context) {
- final List descriptors = new ArrayList();
+ final List descriptors = new ArrayList<>();
descriptors.add(ENDPOINT);
descriptors.add(CONSUMER_KEY);
descriptors.add(CONSUMER_SECRET);
@@ -155,7 +157,7 @@ public class GetTwitter extends AbstractProcessor {
descriptors.add(FOLLOWING);
this.descriptors = Collections.unmodifiableList(descriptors);
- final Set relationships = new HashSet();
+ final Set relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@@ -169,192 +171,194 @@ public class GetTwitter extends AbstractProcessor {
public final List getSupportedPropertyDescriptors() {
return descriptors;
}
-
+
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
- return new PropertyDescriptor.Builder()
- .name(propertyDescriptorName)
- .description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query")
- .required(false)
- .dynamic(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query")
+ .required(false)
+ .dynamic(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
}
-
+
@Override
protected Collection customValidate(final ValidationContext validationContext) {
- final List results = new ArrayList<>();
- final String endpointName = validationContext.getProperty(ENDPOINT).getValue();
-
- if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) {
- if ( !validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() ) {
- results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()).valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build());
- }
- }
-
- return results;
+ final List results = new ArrayList<>();
+ final String endpointName = validationContext.getProperty(ENDPOINT).getValue();
+
+ if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
+ if (!validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet()) {
+ results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName())
+ .valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build());
+ }
+ }
+
+ return results;
}
-
+
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
- // if any property is modified, the results are no longer valid. Destroy all messages in teh queue.
- messageQueue.clear();
+ // if any property is modified, the results are no longer valid. Destroy all messages in teh queue.
+ messageQueue.clear();
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws MalformedURLException {
- messageQueue = new LinkedBlockingQueue<>(100000);
-
- final String endpointName = context.getProperty(ENDPOINT).getValue();
- final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
- context.getProperty(CONSUMER_SECRET).getValue(),
- context.getProperty(ACCESS_TOKEN).getValue(),
- context.getProperty(ACCESS_TOKEN_SECRET).getValue());
+ messageQueue = new LinkedBlockingQueue<>(100000);
- final ClientBuilder clientBuilder = new ClientBuilder();
- clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]")
- .authentication(oauth)
- .eventMessageQueue(eventQueue)
- .processor(new StringDelimitedProcessor(messageQueue));
+ final String endpointName = context.getProperty(ENDPOINT).getValue();
+ final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
+ context.getProperty(CONSUMER_SECRET).getValue(),
+ context.getProperty(ACCESS_TOKEN).getValue(),
+ context.getProperty(ACCESS_TOKEN_SECRET).getValue());
- final String languageString = context.getProperty(LANGUAGES).getValue();
- final List languages;
- if ( languageString == null ) {
- languages = null;
- } else {
- languages = new ArrayList<>();
- for ( final String language : context.getProperty(LANGUAGES).getValue().split(",") ) {
- languages.add(language.trim());
- }
- }
-
- final String host;
- final StreamingEndpoint streamingEndpoint;
- if ( ENDPOINT_SAMPLE.getValue().equals(endpointName) ) {
- host = Constants.STREAM_HOST;
- final StatusesSampleEndpoint sse = new StatusesSampleEndpoint();
- streamingEndpoint = sse;
- if ( languages != null ) {
- sse.languages(languages);
- }
- } else if ( ENDPOINT_FIREHOSE.getValue().equals(endpointName) ) {
- host = Constants.STREAM_HOST;
- final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint();
- streamingEndpoint = firehoseEndpoint;
- if ( languages != null ) {
- firehoseEndpoint.languages(languages);
- }
- } else if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) {
- host = Constants.STREAM_HOST;
- final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint();
-
- final String followingString = context.getProperty(FOLLOWING).getValue();
- final List followingIds;
- if ( followingString == null ) {
- followingIds = Collections.emptyList();
- } else {
- followingIds = new ArrayList<>();
-
- for ( final String split : followingString.split(",") ) {
- final Long id = Long.parseLong(split.trim());
- followingIds.add(id);
- }
- }
-
- final String termString = context.getProperty(TERMS).getValue();
- final List terms;
- if ( termString == null ) {
- terms = Collections.emptyList();
- } else {
- terms = new ArrayList<>();
- for ( final String split : termString.split(",") ) {
- terms.add(split.trim());
- }
- }
-
- if ( !terms.isEmpty() ) {
- filterEndpoint.trackTerms(terms);
- }
-
- if ( !followingIds.isEmpty() ) {
- filterEndpoint.followings(followingIds);
- }
-
- if ( languages != null ) {
- filterEndpoint.languages(languages);
- }
- streamingEndpoint = filterEndpoint;
- } else {
- throw new AssertionError("Endpoint was invalid value: " + endpointName);
- }
+ final ClientBuilder clientBuilder = new ClientBuilder();
+ clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]")
+ .authentication(oauth)
+ .eventMessageQueue(eventQueue)
+ .processor(new StringDelimitedProcessor(messageQueue));
- clientBuilder.hosts(host).endpoint(streamingEndpoint);
- client = clientBuilder.build();
- client.connect();
+ final String languageString = context.getProperty(LANGUAGES).getValue();
+ final List languages;
+ if (languageString == null) {
+ languages = null;
+ } else {
+ languages = new ArrayList<>();
+ for (final String language : context.getProperty(LANGUAGES).getValue().split(",")) {
+ languages.add(language.trim());
+ }
+ }
+
+ final String host;
+ final StreamingEndpoint streamingEndpoint;
+ if (ENDPOINT_SAMPLE.getValue().equals(endpointName)) {
+ host = Constants.STREAM_HOST;
+ final StatusesSampleEndpoint sse = new StatusesSampleEndpoint();
+ streamingEndpoint = sse;
+ if (languages != null) {
+ sse.languages(languages);
+ }
+ } else if (ENDPOINT_FIREHOSE.getValue().equals(endpointName)) {
+ host = Constants.STREAM_HOST;
+ final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint();
+ streamingEndpoint = firehoseEndpoint;
+ if (languages != null) {
+ firehoseEndpoint.languages(languages);
+ }
+ } else if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
+ host = Constants.STREAM_HOST;
+ final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint();
+
+ final String followingString = context.getProperty(FOLLOWING).getValue();
+ final List followingIds;
+ if (followingString == null) {
+ followingIds = Collections.emptyList();
+ } else {
+ followingIds = new ArrayList<>();
+
+ for (final String split : followingString.split(",")) {
+ final Long id = Long.parseLong(split.trim());
+ followingIds.add(id);
+ }
+ }
+
+ final String termString = context.getProperty(TERMS).getValue();
+ final List terms;
+ if (termString == null) {
+ terms = Collections.emptyList();
+ } else {
+ terms = new ArrayList<>();
+ for (final String split : termString.split(",")) {
+ terms.add(split.trim());
+ }
+ }
+
+ if (!terms.isEmpty()) {
+ filterEndpoint.trackTerms(terms);
+ }
+
+ if (!followingIds.isEmpty()) {
+ filterEndpoint.followings(followingIds);
+ }
+
+ if (languages != null) {
+ filterEndpoint.languages(languages);
+ }
+ streamingEndpoint = filterEndpoint;
+ } else {
+ throw new AssertionError("Endpoint was invalid value: " + endpointName);
+ }
+
+ clientBuilder.hosts(host).endpoint(streamingEndpoint);
+ client = clientBuilder.build();
+ client.connect();
}
@OnStopped
public void shutdownClient() {
- if ( client != null ) {
- client.stop();
- }
+ if (client != null) {
+ client.stop();
+ }
}
-
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- final Event event = eventQueue.poll();
- if ( event != null ) {
- switch (event.getEventType()) {
- case STOPPED_BY_ERROR:
- getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[] {event.getEventType(), event.getMessage(), event.getUnderlyingException()});
- break;
- case CONNECTION_ERROR:
- case HTTP_ERROR:
- getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[] {event.getEventType(), event.getMessage()});
- client.reconnect();
- break;
- default:
- break;
- }
- }
-
- final String tweet = messageQueue.poll();
- if ( tweet == null ) {
- context.yield();
- return;
- }
-
- FlowFile flowFile = session.create();
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- out.write(tweet.getBytes(StandardCharsets.UTF_8));
- }
- });
-
- final Map attributes = new HashMap<>();
- attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
- attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json");
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- session.transfer(flowFile, REL_SUCCESS);
- session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString());
+ final Event event = eventQueue.poll();
+ if (event != null) {
+ switch (event.getEventType()) {
+ case STOPPED_BY_ERROR:
+ getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[]{event.getEventType(), event.getMessage(), event.getUnderlyingException()});
+ break;
+ case CONNECTION_ERROR:
+ case HTTP_ERROR:
+ getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[]{event.getEventType(), event.getMessage()});
+ client.reconnect();
+ break;
+ default:
+ break;
+ }
+ }
+
+ final String tweet = messageQueue.poll();
+ if (tweet == null) {
+ context.yield();
+ return;
+ }
+
+ FlowFile flowFile = session.create();
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(tweet.getBytes(StandardCharsets.UTF_8));
+ }
+ });
+
+ final Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json");
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ session.transfer(flowFile, REL_SUCCESS);
+ session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString());
}
private static class FollowingValidator implements Validator {
- private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
-
- @Override
- public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- final String[] splits = input.split(",");
- for ( final String split : splits ) {
- if ( !NUMBER_PATTERN.matcher(split.trim()).matches() ) {
- return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build();
- }
- }
-
- return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
- }
-
+
+ private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
+
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ final String[] splits = input.split(",");
+ for (final String split : splits) {
+ if (!NUMBER_PATTERN.matcher(split.trim()).matches()) {
+ return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build();
+ }
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
+ }
+
}
}