From e9cb3b300c6b5e223744925cee1d0c59dd97d29a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 9 Apr 2015 17:56:52 -0400 Subject: [PATCH] NIFI-244: Initial import of GetTwitter processor --- nifi/nifi-assembly/NOTICE | 49 + nifi/nifi-assembly/pom.xml | 925 +++++++++--------- nifi/nifi-commons/pom.xml | 1 + .../nifi-social-media-nar/pom.xml | 36 + .../nifi-twitter-processors/.gitignore | 1 + .../nifi-twitter-processors/pom.xml | 60 ++ .../nifi/processors/twitter/GetTwitter.java | 360 +++++++ .../org.apache.nifi.processor.Processor | 16 + .../nifi-social-media-bundle/pom.xml | 33 + nifi/nifi-nar-bundles/pom.xml | 4 + nifi/pom.xml | 24 + 11 files changed, 1060 insertions(+), 449 deletions(-) create mode 100644 nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/pom.xml create mode 100644 nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/.gitignore create mode 100644 nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml create mode 100644 nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java create mode 100644 nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nifi/nifi-nar-bundles/nifi-social-media-bundle/pom.xml diff --git a/nifi/nifi-assembly/NOTICE b/nifi/nifi-assembly/NOTICE index 8d7db8d7ec..d95e2ffed4 100644 --- a/nifi/nifi-assembly/NOTICE +++ b/nifi/nifi-assembly/NOTICE @@ -501,6 +501,38 @@ The following binary components are provided under the Apache Software License v Apache License Version 2.0 http://www.apache.org/licenses/. (c) Daniel Lemire, http://lemire.me/en/ + (ASLv2) Twitter4J + The following NOTICE information applies: + Copyright 2007 Yusuke Yamamoto + + Twitter4J includes software from JSON.org to parse JSON response from the Twitter API. You can see the license term at http://www.JSON.org/license.html + + (ASLv2) JOAuth + The following NOTICE information applies: + JOAuth + Copyright 2010-2013 Twitter, Inc + + Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 + + (ASLv2) Hosebird Client + The following NOTICE information applies: + Hosebird Client (hbc) + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0 + + (ASLv2) GeoIP2 Java API + The following NOTICE information applies: + GeoIP2 Java API + This software is Copyright (c) 2013 by MaxMind, Inc. + + This is free software, licensed under the Apache License, Version 2.0. + + (ASLv2) Google HTTP Client Library for Java + The following NOTICE information applies: + Google HTTP Client Library for Java + + This is free software, licensed under the Apache License, Version 2.0. ************************ Common Development and Distribution License 1.1 @@ -540,6 +572,14 @@ The following binary components are provided under the Common Development and Di (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net) (CDDL 1.0) SR 250 Common Annotations For The JavaTM Platform (javax.annotation:jsr250-api:jar:1.0 - http://jcp.org/aboutJava/communityprocess/final/jsr250/index.html) +************************ +Creative Commons Attribution-ShareAlike 3.0 +************************ + +The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details. + + (CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB) + ************************ Eclipse Public License 1.0 ************************ @@ -559,6 +599,15 @@ The following binary components are provided under the Mozilla Public License v2 (MPL 2.0) Saxon HE (net.sf.saxon:Saxon-HE:jar:9.6.0-4 - http://www.saxonica.com/) +***************** +Mozilla Public License v1.1 +***************** + +The following binary components are provided under the Mozilla Public License v1.1. See project link for details. + + (MPL 1.1) HAPI Base (ca.uhn.hapi:hapi-base:2.2 - http://http://hl7api.sourceforge.net/) + (MPL 1.1) HAPI Structures (ca.uhn.hapi:hapi-structures-v*:2.2 - http://http://hl7api.sourceforge.net/) + ***************** Public Domain ***************** diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml index a26f214c87..13ffba802d 100644 --- a/nifi/nifi-assembly/pom.xml +++ b/nifi/nifi-assembly/pom.xml @@ -1,457 +1,484 @@ - - - 4.0.0 - - org.apache.nifi - nifi - 0.1.0-incubating-SNAPSHOT - - nifi-assembly - pom - This is the assembly Apache NiFi (incubating) - - - - maven-assembly-plugin - - nifi-${project.version} - false - - - - make shared resource - - single - - package - - - src/main/assembly/dependencies.xml - - posix - - - - - - - - - ch.qos.logback - logback-classic - compile - - - org.slf4j - jcl-over-slf4j - compile - - - org.slf4j - jul-to-slf4j - compile - - - org.slf4j - log4j-over-slf4j - compile - - - org.slf4j - slf4j-api - compile - - - org.apache.nifi - nifi-api - - - org.apache.nifi - nifi-runtime - - - org.apache.nifi - nifi-bootstrap - - - org.apache.nifi - nifi-resources - resources - runtime - zip - - - org.apache.nifi - nifi-docs - resources - runtime - zip - - - org.apache.nifi - nifi-framework-nar - nar - - - org.apache.nifi - nifi-provenance-repository-nar - nar - - - org.apache.nifi - nifi-standard-services-api-nar - nar - - - org.apache.nifi - nifi-ssl-context-service-nar - nar - - - org.apache.nifi - nifi-distributed-cache-services-nar - nar - - - org.apache.nifi - nifi-standard-nar - nar - - - org.apache.nifi - nifi-jetty-bundle - nar - - - org.apache.nifi - nifi-update-attribute-nar - nar - - - org.apache.nifi - nifi-hadoop-libraries-nar - nar - - - org.apache.nifi - nifi-hadoop-nar - nar - - - org.apache.nifi - nifi-kafka-nar - nar - + + + 4.0.0 + + org.apache.nifi + nifi + 0.1.0-incubating-SNAPSHOT + + nifi-assembly + pom + This is the assembly Apache NiFi (incubating) + + + + maven-assembly-plugin + + nifi-${project.version} + false + + + + make shared resource + + single + + package + + + src/main/assembly/dependencies.xml + + posix + + + + + + + - org.apache.nifi - nifi-http-context-map-nar - nar - + ch.qos.logback + logback-classic + compile + - org.apache.nifi - nifi-kite-nar - nar - - - - - - 256 - 512 - 128 - 128 - 10m - 10 - - - ${project.version} - true - 10 sec - 500 ms - 30 sec - 10 millis + org.slf4j + jcl-over-slf4j + compile + + + org.slf4j + jul-to-slf4j + compile + + + org.slf4j + log4j-over-slf4j + compile + + + org.slf4j + slf4j-api + compile + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-runtime + + + org.apache.nifi + nifi-bootstrap + + + org.apache.nifi + nifi-resources + resources + runtime + zip + + + org.apache.nifi + nifi-docs + resources + runtime + zip + + + org.apache.nifi + nifi-framework-nar + nar + + + org.apache.nifi + nifi-provenance-repository-nar + nar + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + org.apache.nifi + nifi-ssl-context-service-nar + nar + + + org.apache.nifi + nifi-distributed-cache-services-nar + nar + + + org.apache.nifi + nifi-standard-nar + nar + + + org.apache.nifi + nifi-jetty-bundle + nar + + + org.apache.nifi + nifi-update-attribute-nar + nar + + + org.apache.nifi + nifi-hadoop-libraries-nar + nar + + + org.apache.nifi + nifi-hadoop-nar + nar + + + org.apache.nifi + nifi-kafka-nar + nar + + + org.apache.nifi + nifi-http-context-map-nar + nar + + + org.apache.nifi + nifi-kite-nar + nar + + + org.apache.nifi + nifi-social-media-nar + 0.1.0-incubating-SNAPSHOT + nar + + + org.apache.nifi + nifi-hl7-nar + 0.1.0-incubating-SNAPSHOT + nar + + + org.apache.nifi + nifi-language-translation-nar + 0.1.0-incubating-SNAPSHOT + nar + + + org.apache.nifi + nifi-geo-nar + 0.1.0-incubating-SNAPSHOT + nar + + - ./conf/flow.xml.gz - ./conf/archive/ - ./conf/authority-providers.xml - ./conf/templates - ./database_repository + + + 256 + 512 + 128 + 128 + 10m + 10 - org.apache.nifi.controller.repository.WriteAheadFlowFileRepository - ./flowfile_repository - 256 - 2 mins - false - org.apache.nifi.controller.FileSystemSwapManager - 20000 - 5 sec - 1 - 5 sec - 4 - - org.apache.nifi.controller.repository.FileSystemRepository - 10 MB - 100 - ./content_repository - - - false - false - - - - - 30 sec - ./lib - ./work/nar/ - ./work/docs/components - - PBEWITHMD5AND256BITAES-CBC-OPENSSL - BC - ;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE + + ${project.version} + true + 10 sec + 500 ms + 30 sec + 10 millis - 9990 - - - org.apache.nifi.provenance.PersistentProvenanceRepository - ./provenance_repository - 24 hours - 1 GB - 5 mins - 100 MB - 2 - true - EventType, FlowFileUUID, Filename, ProcessorID - - 500 MB - false - 16 - - - 100000 - - - org.apache.nifi.controller.status.history.VolatileComponentStatusRepository - 288 - 5 mins - - - ./lib - - 8080 - - - ./work/jetty - 200 - - - - - - - - - - - ./conf/authorized-users.xml - 24 hours - file-provider - - - - - - - 5 sec - false - 30 sec - 45 sec - false - - - 500 ms - 3 - 1 sec + ./conf/flow.xml.gz + ./conf/archive/ + ./conf/authority-providers.xml + ./conf/templates + ./database_repository - - false - - - 2 - - - - - false - - - - 10 - 30 sec - 30 sec - 10 - 5 sec - 10 - 0 sec - - - - rpm - - false - - - - - maven-dependency-plugin - - - unpack-shared-resources - - unpack-dependencies - - generate-resources - - ${project.build.directory}/generated-resources - nifi-resources - org.apache.nifi - false - - - - unpack-docs - - unpack-dependencies - - generate-resources - - ${project.build.directory}/generated-docs - nifi-docs - org.apache.nifi - false - - - - - - org.codehaus.mojo - rpm-maven-plugin - - Apache NiFi (incubating) - Apache Nifi (incubating) is dataflow system based on the Flow-Based Programming concepts. - Apache License, Version 2.0 and others (see included LICENSE file) - http://nifi.incubator.apache.org - Utilities - /opt/nifi - - _use_internal_dependency_generator 0 - - 750 - 640 - root - root - - - - build-bin-rpm - - attached-rpm - - - bin - - nifi - - - - /opt/nifi/nifi-${project.version} - - - /opt/nifi/nifi-${project.version} - - - ./LICENSE - - - ./NOTICE - - - ../DISCLAIMER - - - ./README.md - README - - - - - /opt/nifi/nifi-${project.version}/bin - 750 - - - ${project.build.directory}/generated-resources/bin/nifi.sh - nifi.sh - true - - - - - /opt/nifi/nifi-${project.version}/conf - true - - - ${project.build.directory}/generated-resources/conf - true - - - - - /opt/nifi/nifi-${project.version}/lib - - - org.apache.nifi:nifi-bootstrap - org.apache.nifi:nifi-resources - org.apache.nifi:nifi-docs - - - - - /opt/nifi/nifi-${project.version}/lib/bootstrap - - - org.apache.nifi:nifi-bootstrap - - - - - /opt/nifi/nifi-${project.version}/docs - - - ${project.build.directory}/generated-docs - - - - - - - - - - - - + org.apache.nifi.controller.repository.WriteAheadFlowFileRepository + ./flowfile_repository + 256 + 2 mins + false + org.apache.nifi.controller.FileSystemSwapManager + 20000 + 5 sec + 1 + 5 sec + 4 + + org.apache.nifi.controller.repository.FileSystemRepository + 10 MB + 100 + ./content_repository + + + false + false + + + + + 30 sec + ./lib + ./work/nar/ + ./work/docs/components + + PBEWITHMD5AND256BITAES-CBC-OPENSSL + BC + ;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE + + 9990 + + + org.apache.nifi.provenance.PersistentProvenanceRepository + ./provenance_repository + 24 hours + 1 GB + 5 mins + 100 MB + 2 + true + EventType, FlowFileUUID, + Filename, ProcessorID + + 500 MB + false + 16 + + + 100000 + + + org.apache.nifi.controller.status.history.VolatileComponentStatusRepository + 288 + 5 mins + + + ./lib + + 8080 + + + ./work/jetty + 200 + + + + + + + + + + + ./conf/authorized-users.xml + 24 hours + file-provider + + + + + + + 5 sec + false + 30 sec + 45 sec + false + + + 500 ms + 3 + 1 sec + + + false + + + 2 + + + + + false + + + + 10 + 30 sec + 30 sec + 10 + 5 sec + 10 + 0 sec + + + + rpm + + false + + + + + maven-dependency-plugin + + + unpack-shared-resources + + unpack-dependencies + + generate-resources + + ${project.build.directory}/generated-resources + nifi-resources + org.apache.nifi + false + + + + unpack-docs + + unpack-dependencies + + generate-resources + + ${project.build.directory}/generated-docs + nifi-docs + org.apache.nifi + false + + + + + + org.codehaus.mojo + rpm-maven-plugin + + Apache NiFi (incubating) + Apache Nifi (incubating) is dataflow system based on + the Flow-Based Programming concepts. + Apache License, Version 2.0 and others (see included + LICENSE file) + http://nifi.incubator.apache.org + Utilities + /opt/nifi + + _use_internal_dependency_generator 0 + + 750 + 640 + root + root + + + + build-bin-rpm + + attached-rpm + + + bin + + nifi + + + + /opt/nifi/nifi-${project.version} + + + /opt/nifi/nifi-${project.version} + + + ./LICENSE + + + ./NOTICE + + + ../DISCLAIMER + + + ./README.md + README + + + + + /opt/nifi/nifi-${project.version}/bin + 750 + + + ${project.build.directory}/generated-resources/bin/nifi.sh + nifi.sh + true + + + + + /opt/nifi/nifi-${project.version}/conf + true + + + ${project.build.directory}/generated-resources/conf + true + + + + + /opt/nifi/nifi-${project.version}/lib + + + org.apache.nifi:nifi-bootstrap + org.apache.nifi:nifi-resources + org.apache.nifi:nifi-docs + + + + + /opt/nifi/nifi-${project.version}/lib/bootstrap + + + org.apache.nifi:nifi-bootstrap + + + + + /opt/nifi/nifi-${project.version}/docs + + + ${project.build.directory}/generated-docs + + + + + + + + + + + + diff --git a/nifi/nifi-commons/pom.xml b/nifi/nifi-commons/pom.xml index 53580549bc..1d2ce46418 100644 --- a/nifi/nifi-commons/pom.xml +++ b/nifi/nifi-commons/pom.xml @@ -36,5 +36,6 @@ nifi-processor-utilities nifi-write-ahead-log nifi-site-to-site-client + nifi-hl7-query-language diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/pom.xml b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/pom.xml new file mode 100644 index 0000000000..6da74dd68a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-social-media-nar/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-social-media-bundle + 0.1.0-incubating-SNAPSHOT + + + nifi-social-media-nar + nar + + + + org.apache.nifi + nifi-twitter-processors + 0.1.0-incubating-SNAPSHOT + + + + diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/.gitignore b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/.gitignore new file mode 100644 index 0000000000..b83d22266a --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml new file mode 100644 index 0000000000..45af0cec5f --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml @@ -0,0 +1,60 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-social-media-bundle + 0.1.0-incubating-SNAPSHOT + + + nifi-twitter-processors + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + + com.twitter + hbc-twitter4j + 2.2.0 + + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + junit + junit + test + + + diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java new file mode 100644 index 0000000000..45b1ae1a13 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.twitter; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.regex.Pattern; + +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import com.twitter.hbc.ClientBuilder; +import com.twitter.hbc.core.Client; +import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint; +import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint; +import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; +import com.twitter.hbc.core.endpoint.StreamingEndpoint; +import com.twitter.hbc.core.event.Event; +import com.twitter.hbc.core.processor.StringDelimitedProcessor; +import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.OAuth1; + +@SupportsBatching +@Tags({"twitter", "tweets", "social media", "status", "json"}) +@CapabilityDescription("Pulls status changes from Twitter's streaming API") +@WritesAttribute(attribute="mime.type", description="Sets mime type to application/json") +public class GetTwitter extends AbstractProcessor { + + static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'"); + static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets"); + static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs"); + + public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder() + .name("Twitter Endpoint") + .description("Specifies which endpoint data should be pulled from") + .required(true) + .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER) + .defaultValue(ENDPOINT_SAMPLE.getValue()) + .build(); + public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder() + .name("Consumer Key") + .description("The Consumer Key provided by Twitter") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor CONSUMER_SECRET = new PropertyDescriptor.Builder() + .name("Consumer Secret") + .description("The Consumer Secret provided by Twitter") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() + .name("Access Token") + .description("The Acces Token provided by Twitter") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new PropertyDescriptor.Builder() + .name("Access Token Secret") + .description("The Access Token Secret provided by Twitter") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor LANGUAGES = new PropertyDescriptor.Builder() + .name("Languages") + .description("A comma-separated list of languages for which tweets should be fetched") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor FOLLOWING = new PropertyDescriptor.Builder() + .name("IDs to Follow") + .description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.") + .required(false) + .addValidator(new FollowingValidator()) + .build(); + public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder() + .name("Terms to Filter On") + .description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'. The filter works such that if any term matches, the status update will be retrieved; multiple terms separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that have either 'hello' or both 'it' AND 'was'") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All status updates will be routed to this relationship") + .build(); + + private List descriptors; + private Set relationships; + + private final BlockingQueue eventQueue = new LinkedBlockingQueue(1000); + + private volatile Client client; + private volatile BlockingQueue messageQueue; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List descriptors = new ArrayList(); + descriptors.add(ENDPOINT); + descriptors.add(CONSUMER_KEY); + descriptors.add(CONSUMER_SECRET); + descriptors.add(ACCESS_TOKEN); + descriptors.add(ACCESS_TOKEN_SECRET); + descriptors.add(LANGUAGES); + descriptors.add(TERMS); + descriptors.add(FOLLOWING); + this.descriptors = Collections.unmodifiableList(descriptors); + + final Set relationships = new HashSet(); + relationships.add(REL_SUCCESS); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return this.relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return descriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query") + .required(false) + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(); + final String endpointName = validationContext.getProperty(ENDPOINT).getValue(); + + if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) { + if ( !validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() ) { + results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()).valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build()); + } + } + + return results; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + // if any property is modified, the results are no longer valid. Destroy all messages in teh queue. + messageQueue.clear(); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws MalformedURLException { + messageQueue = new LinkedBlockingQueue<>(100000); + + final String endpointName = context.getProperty(ENDPOINT).getValue(); + final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(), + context.getProperty(CONSUMER_SECRET).getValue(), + context.getProperty(ACCESS_TOKEN).getValue(), + context.getProperty(ACCESS_TOKEN_SECRET).getValue()); + + final ClientBuilder clientBuilder = new ClientBuilder(); + clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]") + .authentication(oauth) + .eventMessageQueue(eventQueue) + .processor(new StringDelimitedProcessor(messageQueue)); + + final String languageString = context.getProperty(LANGUAGES).getValue(); + final List languages; + if ( languageString == null ) { + languages = null; + } else { + languages = new ArrayList<>(); + for ( final String language : context.getProperty(LANGUAGES).getValue().split(",") ) { + languages.add(language.trim()); + } + } + + final String host; + final StreamingEndpoint streamingEndpoint; + if ( ENDPOINT_SAMPLE.getValue().equals(endpointName) ) { + host = Constants.STREAM_HOST; + final StatusesSampleEndpoint sse = new StatusesSampleEndpoint(); + streamingEndpoint = sse; + if ( languages != null ) { + sse.languages(languages); + } + } else if ( ENDPOINT_FIREHOSE.getValue().equals(endpointName) ) { + host = Constants.STREAM_HOST; + final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint(); + streamingEndpoint = firehoseEndpoint; + if ( languages != null ) { + firehoseEndpoint.languages(languages); + } + } else if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) { + host = Constants.STREAM_HOST; + final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint(); + + final String followingString = context.getProperty(FOLLOWING).getValue(); + final List followingIds; + if ( followingString == null ) { + followingIds = Collections.emptyList(); + } else { + followingIds = new ArrayList<>(); + + for ( final String split : followingString.split(",") ) { + final Long id = Long.parseLong(split.trim()); + followingIds.add(id); + } + } + + final String termString = context.getProperty(TERMS).getValue(); + final List terms; + if ( termString == null ) { + terms = Collections.emptyList(); + } else { + terms = new ArrayList<>(); + for ( final String split : termString.split(",") ) { + terms.add(split.trim()); + } + } + + if ( !terms.isEmpty() ) { + filterEndpoint.trackTerms(terms); + } + + if ( !followingIds.isEmpty() ) { + filterEndpoint.followings(followingIds); + } + + if ( languages != null ) { + filterEndpoint.languages(languages); + } + streamingEndpoint = filterEndpoint; + } else { + throw new AssertionError("Endpoint was invalid value: " + endpointName); + } + + clientBuilder.hosts(host).endpoint(streamingEndpoint); + client = clientBuilder.build(); + client.connect(); + } + + @OnStopped + public void shutdownClient() { + if ( client != null ) { + client.stop(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final Event event = eventQueue.poll(); + if ( event != null ) { + switch (event.getEventType()) { + case STOPPED_BY_ERROR: + getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[] {event.getEventType(), event.getMessage(), event.getUnderlyingException()}); + break; + case CONNECTION_ERROR: + case HTTP_ERROR: + getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[] {event.getEventType(), event.getMessage()}); + client.reconnect(); + break; + default: + break; + } + } + + final String tweet = messageQueue.poll(); + if ( tweet == null ) { + context.yield(); + return; + } + + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(tweet.getBytes(StandardCharsets.UTF_8)); + } + }); + + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json"); + flowFile = session.putAllAttributes(flowFile, attributes); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString()); + } + + private static class FollowingValidator implements Validator { + private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String[] splits = input.split(","); + for ( final String split : splits ) { + if ( !NUMBER_PATTERN.matcher(split.trim()).matches() ) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build(); + } + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + + } +} diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..9504a1128e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.processors.twitter.GetTwitter \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-social-media-bundle/pom.xml new file mode 100644 index 0000000000..5aadbce05e --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/pom.xml @@ -0,0 +1,33 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-nar-bundles + 0.1.0-incubating-SNAPSHOT + + + nifi-social-media-bundle + pom + + + nifi-twitter-processors + nifi-social-media-nar + + + diff --git a/nifi/nifi-nar-bundles/pom.xml b/nifi/nifi-nar-bundles/pom.xml index e7c122d5e8..50a940721a 100644 --- a/nifi/nifi-nar-bundles/pom.xml +++ b/nifi/nifi-nar-bundles/pom.xml @@ -35,6 +35,10 @@ nifi-update-attribute-bundle nifi-kafka-bundle nifi-kite-bundle + nifi-social-media-bundle + nifi-geo-bundle + nifi-hl7-bundle + nifi-language-translation-bundle diff --git a/nifi/pom.xml b/nifi/pom.xml index 2e2346a6f5..9b8bfb441d 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -798,6 +798,30 @@ 0.1.0-incubating-SNAPSHOT nar + + org.apache.nifi + nifi-social-media-nar + 0.1.0-incubating-SNAPSHOT + nar + + + org.apache.nifi + nifi-hl7-nar + 0.1.0-incubating-SNAPSHOT + nar + + + org.apache.nifi + nifi-language-translation-nar + 0.1.0-incubating-SNAPSHOT + nar + + + org.apache.nifi + nifi-geo-nar + 0.1.0-incubating-SNAPSHOT + nar + org.apache.nifi nifi-properties