NIFI-244: Initial import of GetTwitter processor

This commit is contained in:
Mark Payne 2015-04-09 17:56:52 -04:00
parent 178c5cd287
commit e9cb3b300c
11 changed files with 1060 additions and 449 deletions

View File

@ -501,6 +501,38 @@ The following binary components are provided under the Apache Software License v
Apache License Version 2.0 http://www.apache.org/licenses/. Apache License Version 2.0 http://www.apache.org/licenses/.
(c) Daniel Lemire, http://lemire.me/en/ (c) Daniel Lemire, http://lemire.me/en/
(ASLv2) Twitter4J
The following NOTICE information applies:
Copyright 2007 Yusuke Yamamoto
Twitter4J includes software from JSON.org to parse JSON response from the Twitter API. You can see the license term at http://www.JSON.org/license.html
(ASLv2) JOAuth
The following NOTICE information applies:
JOAuth
Copyright 2010-2013 Twitter, Inc
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
(ASLv2) Hosebird Client
The following NOTICE information applies:
Hosebird Client (hbc)
Copyright 2013 Twitter, Inc.
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
(ASLv2) GeoIP2 Java API
The following NOTICE information applies:
GeoIP2 Java API
This software is Copyright (c) 2013 by MaxMind, Inc.
This is free software, licensed under the Apache License, Version 2.0.
(ASLv2) Google HTTP Client Library for Java
The following NOTICE information applies:
Google HTTP Client Library for Java
This is free software, licensed under the Apache License, Version 2.0.
************************ ************************
Common Development and Distribution License 1.1 Common Development and Distribution License 1.1
@ -540,6 +572,14 @@ The following binary components are provided under the Common Development and Di
(CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net) (CDDL 1.0) JavaServer Pages(TM) API (javax.servlet.jsp:jsp-api:jar:2.1 - http://jsp.java.net)
(CDDL 1.0) SR 250 Common Annotations For The JavaTM Platform (javax.annotation:jsr250-api:jar:1.0 - http://jcp.org/aboutJava/communityprocess/final/jsr250/index.html) (CDDL 1.0) SR 250 Common Annotations For The JavaTM Platform (javax.annotation:jsr250-api:jar:1.0 - http://jcp.org/aboutJava/communityprocess/final/jsr250/index.html)
************************
Creative Commons Attribution-ShareAlike 3.0
************************
The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details.
(CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB)
************************ ************************
Eclipse Public License 1.0 Eclipse Public License 1.0
************************ ************************
@ -559,6 +599,15 @@ The following binary components are provided under the Mozilla Public License v2
(MPL 2.0) Saxon HE (net.sf.saxon:Saxon-HE:jar:9.6.0-4 - http://www.saxonica.com/) (MPL 2.0) Saxon HE (net.sf.saxon:Saxon-HE:jar:9.6.0-4 - http://www.saxonica.com/)
*****************
Mozilla Public License v1.1
*****************
The following binary components are provided under the Mozilla Public License v1.1. See project link for details.
(MPL 1.1) HAPI Base (ca.uhn.hapi:hapi-base:2.2 - http://http://hl7api.sourceforge.net/)
(MPL 1.1) HAPI Structures (ca.uhn.hapi:hapi-structures-v*:2.2 - http://http://hl7api.sourceforge.net/)
***************** *****************
Public Domain Public Domain
***************** *****************

View File

@ -1,19 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<!-- <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
Licensed to the Apache Software Foundation (ASF) under one or more license agreements. See the NOTICE file distributed with this work for additional
contributor license agreements. See the NOTICE file distributed with information regarding copyright ownership. The ASF licenses this file to
this work for additional information regarding copyright ownership. You under the Apache License, Version 2.0 (the "License"); you may not use
The ASF licenses this file to You under the Apache License, Version 2.0 this file except in compliance with the License. You may obtain a copy of
(the "License"); you may not use this file except in compliance with the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
the License. You may obtain a copy of the License at by applicable law or agreed to in writing, software distributed under the
http://www.apache.org/licenses/LICENSE-2.0 License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
Unless required by applicable law or agreed to in writing, software OF ANY KIND, either express or implied. See the License for the specific
distributed under the License is distributed on an "AS IS" BASIS, language governing permissions and limitations under the License. -->
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
See the License for the specific language governing permissions and xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
@ -166,6 +163,30 @@
<artifactId>nifi-kite-nar</artifactId> <artifactId>nifi-kite-nar</artifactId>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-social-media-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hl7-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-language-translation-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geo-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
<properties> <properties>
@ -235,7 +256,8 @@
<nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size> <nifi.provenance.repository.rollover.size>100 MB</nifi.provenance.repository.rollover.size>
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads> <nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover> <nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
<nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, Filename, ProcessorID</nifi.provenance.repository.indexed.fields> <nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID,
Filename, ProcessorID</nifi.provenance.repository.indexed.fields>
<nifi.provenance.repository.indexed.attributes /> <nifi.provenance.repository.indexed.attributes />
<nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size> <nifi.provenance.repository.index.shard.size>500 MB</nifi.provenance.repository.index.shard.size>
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync> <nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
@ -275,7 +297,8 @@
<nifi.security.ocsp.responder.url /> <nifi.security.ocsp.responder.url />
<nifi.security.ocsp.responder.certificate /> <nifi.security.ocsp.responder.certificate />
<!-- nifi.properties: cluster common properties (cluster manager and nodes must have same values) --> <!-- nifi.properties: cluster common properties (cluster manager and nodes
must have same values) -->
<nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval> <nifi.cluster.protocol.heartbeat.interval>5 sec</nifi.cluster.protocol.heartbeat.interval>
<nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure> <nifi.cluster.protocol.is.secure>false</nifi.cluster.protocol.is.secure>
<nifi.cluster.protocol.socket.timeout>30 sec</nifi.cluster.protocol.socket.timeout> <nifi.cluster.protocol.socket.timeout>30 sec</nifi.cluster.protocol.socket.timeout>
@ -287,7 +310,8 @@
<nifi.cluster.protocol.multicast.service.locator.attempts>3</nifi.cluster.protocol.multicast.service.locator.attempts> <nifi.cluster.protocol.multicast.service.locator.attempts>3</nifi.cluster.protocol.multicast.service.locator.attempts>
<nifi.cluster.protocol.multicast.service.locator.attempts.delay>1 sec</nifi.cluster.protocol.multicast.service.locator.attempts.delay> <nifi.cluster.protocol.multicast.service.locator.attempts.delay>1 sec</nifi.cluster.protocol.multicast.service.locator.attempts.delay>
<!-- nifi.properties: cluster node properties (only configure for cluster nodes) --> <!-- nifi.properties: cluster node properties (only configure for cluster
nodes) -->
<nifi.cluster.is.node>false</nifi.cluster.is.node> <nifi.cluster.is.node>false</nifi.cluster.is.node>
<nifi.cluster.node.address /> <nifi.cluster.node.address />
<nifi.cluster.node.protocol.port /> <nifi.cluster.node.protocol.port />
@ -295,7 +319,8 @@
<nifi.cluster.node.unicast.manager.address /> <nifi.cluster.node.unicast.manager.address />
<nifi.cluster.node.unicast.manager.protocol.port /> <nifi.cluster.node.unicast.manager.protocol.port />
<!-- nifi.properties: cluster manager properties (only configure for cluster manager) --> <!-- nifi.properties: cluster manager properties (only configure for cluster
manager) -->
<nifi.cluster.is.manager>false</nifi.cluster.is.manager> <nifi.cluster.is.manager>false</nifi.cluster.is.manager>
<nifi.cluster.manager.address /> <nifi.cluster.manager.address />
<nifi.cluster.manager.protocol.port /> <nifi.cluster.manager.protocol.port />
@ -352,8 +377,10 @@
<artifactId>rpm-maven-plugin</artifactId> <artifactId>rpm-maven-plugin</artifactId>
<configuration> <configuration>
<summary>Apache NiFi (incubating)</summary> <summary>Apache NiFi (incubating)</summary>
<description>Apache Nifi (incubating) is dataflow system based on the Flow-Based Programming concepts.</description> <description>Apache Nifi (incubating) is dataflow system based on
<license>Apache License, Version 2.0 and others (see included LICENSE file)</license> the Flow-Based Programming concepts.</description>
<license>Apache License, Version 2.0 and others (see included
LICENSE file)</license>
<url>http://nifi.incubator.apache.org</url> <url>http://nifi.incubator.apache.org</url>
<group>Utilities</group> <group>Utilities</group>
<prefix>/opt/nifi</prefix> <prefix>/opt/nifi</prefix>

View File

@ -36,5 +36,6 @@
<module>nifi-processor-utilities</module> <module>nifi-processor-utilities</module>
<module>nifi-write-ahead-log</module> <module>nifi-write-ahead-log</module>
<module>nifi-site-to-site-client</module> <module>nifi-site-to-site-client</module>
<module>nifi-hl7-query-language</module>
</modules> </modules>
</project> </project>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-social-media-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-social-media-nar</artifactId>
<packaging>nar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-twitter-processors</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1 @@
/target/

View File

@ -0,0 +1,60 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-social-media-bundle</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-twitter-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-twitter4j</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,360 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.twitter;
import java.io.IOException;
import java.io.OutputStream;
import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.endpoint.StatusesFirehoseEndpoint;
import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
import com.twitter.hbc.core.endpoint.StreamingEndpoint;
import com.twitter.hbc.core.event.Event;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
@SupportsBatching
@Tags({"twitter", "tweets", "social media", "status", "json"})
@CapabilityDescription("Pulls status changes from Twitter's streaming API")
@WritesAttribute(attribute="mime.type", description="Sets mime type to application/json")
public class GetTwitter extends AbstractProcessor {
static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'");
static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets");
static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs");
public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder()
.name("Twitter Endpoint")
.description("Specifies which endpoint data should be pulled from")
.required(true)
.allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER)
.defaultValue(ENDPOINT_SAMPLE.getValue())
.build();
public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder()
.name("Consumer Key")
.description("The Consumer Key provided by Twitter")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONSUMER_SECRET = new PropertyDescriptor.Builder()
.name("Consumer Secret")
.description("The Consumer Secret provided by Twitter")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
.name("Access Token")
.description("The Acces Token provided by Twitter")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new PropertyDescriptor.Builder()
.name("Access Token Secret")
.description("The Access Token Secret provided by Twitter")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor LANGUAGES = new PropertyDescriptor.Builder()
.name("Languages")
.description("A comma-separated list of languages for which tweets should be fetched")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FOLLOWING = new PropertyDescriptor.Builder()
.name("IDs to Follow")
.description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.")
.required(false)
.addValidator(new FollowingValidator())
.build();
public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder()
.name("Terms to Filter On")
.description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'. The filter works such that if any term matches, the status update will be retrieved; multiple terms separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that have either 'hello' or both 'it' AND 'was'")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All status updates will be routed to this relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>(1000);
private volatile Client client;
private volatile BlockingQueue<String> messageQueue;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(ENDPOINT);
descriptors.add(CONSUMER_KEY);
descriptors.add(CONSUMER_SECRET);
descriptors.add(ACCESS_TOKEN);
descriptors.add(ACCESS_TOKEN_SECRET);
descriptors.add(LANGUAGES);
descriptors.add(TERMS);
descriptors.add(FOLLOWING);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query")
.required(false)
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String endpointName = validationContext.getProperty(ENDPOINT).getValue();
if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) {
if ( !validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() ) {
results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()).valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build());
}
}
return results;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
// if any property is modified, the results are no longer valid. Destroy all messages in teh queue.
messageQueue.clear();
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws MalformedURLException {
messageQueue = new LinkedBlockingQueue<>(100000);
final String endpointName = context.getProperty(ENDPOINT).getValue();
final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
context.getProperty(CONSUMER_SECRET).getValue(),
context.getProperty(ACCESS_TOKEN).getValue(),
context.getProperty(ACCESS_TOKEN_SECRET).getValue());
final ClientBuilder clientBuilder = new ClientBuilder();
clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]")
.authentication(oauth)
.eventMessageQueue(eventQueue)
.processor(new StringDelimitedProcessor(messageQueue));
final String languageString = context.getProperty(LANGUAGES).getValue();
final List<String> languages;
if ( languageString == null ) {
languages = null;
} else {
languages = new ArrayList<>();
for ( final String language : context.getProperty(LANGUAGES).getValue().split(",") ) {
languages.add(language.trim());
}
}
final String host;
final StreamingEndpoint streamingEndpoint;
if ( ENDPOINT_SAMPLE.getValue().equals(endpointName) ) {
host = Constants.STREAM_HOST;
final StatusesSampleEndpoint sse = new StatusesSampleEndpoint();
streamingEndpoint = sse;
if ( languages != null ) {
sse.languages(languages);
}
} else if ( ENDPOINT_FIREHOSE.getValue().equals(endpointName) ) {
host = Constants.STREAM_HOST;
final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint();
streamingEndpoint = firehoseEndpoint;
if ( languages != null ) {
firehoseEndpoint.languages(languages);
}
} else if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) {
host = Constants.STREAM_HOST;
final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint();
final String followingString = context.getProperty(FOLLOWING).getValue();
final List<Long> followingIds;
if ( followingString == null ) {
followingIds = Collections.emptyList();
} else {
followingIds = new ArrayList<>();
for ( final String split : followingString.split(",") ) {
final Long id = Long.parseLong(split.trim());
followingIds.add(id);
}
}
final String termString = context.getProperty(TERMS).getValue();
final List<String> terms;
if ( termString == null ) {
terms = Collections.emptyList();
} else {
terms = new ArrayList<>();
for ( final String split : termString.split(",") ) {
terms.add(split.trim());
}
}
if ( !terms.isEmpty() ) {
filterEndpoint.trackTerms(terms);
}
if ( !followingIds.isEmpty() ) {
filterEndpoint.followings(followingIds);
}
if ( languages != null ) {
filterEndpoint.languages(languages);
}
streamingEndpoint = filterEndpoint;
} else {
throw new AssertionError("Endpoint was invalid value: " + endpointName);
}
clientBuilder.hosts(host).endpoint(streamingEndpoint);
client = clientBuilder.build();
client.connect();
}
@OnStopped
public void shutdownClient() {
if ( client != null ) {
client.stop();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final Event event = eventQueue.poll();
if ( event != null ) {
switch (event.getEventType()) {
case STOPPED_BY_ERROR:
getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[] {event.getEventType(), event.getMessage(), event.getUnderlyingException()});
break;
case CONNECTION_ERROR:
case HTTP_ERROR:
getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[] {event.getEventType(), event.getMessage()});
client.reconnect();
break;
default:
break;
}
}
final String tweet = messageQueue.poll();
if ( tweet == null ) {
context.yield();
return;
}
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(tweet.getBytes(StandardCharsets.UTF_8));
}
});
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json");
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString());
}
private static class FollowingValidator implements Validator {
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final String[] splits = input.split(",");
for ( final String split : splits ) {
if ( !NUMBER_PATTERN.matcher(split.trim()).matches() ) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
}
}

View File

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.twitter.GetTwitter

View File

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-social-media-bundle</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-twitter-processors</module>
<module>nifi-social-media-nar</module>
</modules>
</project>

View File

@ -35,6 +35,10 @@
<module>nifi-update-attribute-bundle</module> <module>nifi-update-attribute-bundle</module>
<module>nifi-kafka-bundle</module> <module>nifi-kafka-bundle</module>
<module>nifi-kite-bundle</module> <module>nifi-kite-bundle</module>
<module>nifi-social-media-bundle</module>
<module>nifi-geo-bundle</module>
<module>nifi-hl7-bundle</module>
<module>nifi-language-translation-bundle</module>
</modules> </modules>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>

View File

@ -798,6 +798,30 @@
<version>0.1.0-incubating-SNAPSHOT</version> <version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-social-media-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hl7-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-language-translation-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-geo-nar</artifactId>
<version>0.1.0-incubating-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId> <artifactId>nifi-properties</artifactId>