This commit is contained in:
joewitt 2015-04-25 08:52:47 -04:00
parent 90ae022e54
commit 269e25e993
2 changed files with 229 additions and 225 deletions

View File

@ -35,11 +35,11 @@
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-twitter4j</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-twitter4j</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -69,82 +69,84 @@ import com.twitter.hbc.httpclient.auth.OAuth1;
@SupportsBatching
@Tags({"twitter", "tweets", "social media", "status", "json"})
@CapabilityDescription("Pulls status changes from Twitter's streaming API")
@WritesAttribute(attribute="mime.type", description="Sets mime type to application/json")
@WritesAttribute(attribute = "mime.type", description = "Sets mime type to application/json")
public class GetTwitter extends AbstractProcessor {
static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'");
static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets");
static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs");
public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder()
.name("Twitter Endpoint")
.description("Specifies which endpoint data should be pulled from")
.required(true)
.allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER)
.defaultValue(ENDPOINT_SAMPLE.getValue())
.build();
static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'");
static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets");
static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs");
public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder()
.name("Twitter Endpoint")
.description("Specifies which endpoint data should be pulled from")
.required(true)
.allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER)
.defaultValue(ENDPOINT_SAMPLE.getValue())
.build();
public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder()
.name("Consumer Key")
.description("The Consumer Key provided by Twitter")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Consumer Key")
.description("The Consumer Key provided by Twitter")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CONSUMER_SECRET = new PropertyDescriptor.Builder()
.name("Consumer Secret")
.description("The Consumer Secret provided by Twitter")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Consumer Secret")
.description("The Consumer Secret provided by Twitter")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder()
.name("Access Token")
.description("The Acces Token provided by Twitter")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Access Token")
.description("The Acces Token provided by Twitter")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new PropertyDescriptor.Builder()
.name("Access Token Secret")
.description("The Access Token Secret provided by Twitter")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Access Token Secret")
.description("The Access Token Secret provided by Twitter")
.required(true)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor LANGUAGES = new PropertyDescriptor.Builder()
.name("Languages")
.description("A comma-separated list of languages for which tweets should be fetched")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Languages")
.description("A comma-separated list of languages for which tweets should be fetched")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor FOLLOWING = new PropertyDescriptor.Builder()
.name("IDs to Follow")
.description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.")
.required(false)
.addValidator(new FollowingValidator())
.build();
.name("IDs to Follow")
.description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.")
.required(false)
.addValidator(new FollowingValidator())
.build();
public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder()
.name("Terms to Filter On")
.description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'. The filter works such that if any term matches, the status update will be retrieved; multiple terms separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that have either 'hello' or both 'it' AND 'was'")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Terms to Filter On")
.description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'."
+ " The filter works such that if any term matches, the status update will be retrieved; multiple terms"
+ " separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that"
+ " have either 'hello' or both 'it' AND 'was'")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All status updates will be routed to this relationship")
.build();
.name("success")
.description("All status updates will be routed to this relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>(1000);
private final BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<>(1000);
private volatile Client client;
private volatile BlockingQueue<String> messageQueue;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ENDPOINT);
descriptors.add(CONSUMER_KEY);
descriptors.add(CONSUMER_SECRET);
@ -155,7 +157,7 @@ public class GetTwitter extends AbstractProcessor {
descriptors.add(FOLLOWING);
this.descriptors = Collections.unmodifiableList(descriptors);
final Set<Relationship> relationships = new HashSet<Relationship>();
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}
@ -169,192 +171,194 @@ public class GetTwitter extends AbstractProcessor {
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query")
.required(false)
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query")
.required(false)
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final String endpointName = validationContext.getProperty(ENDPOINT).getValue();
if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) {
if ( !validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() ) {
results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()).valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build());
}
}
return results;
final List<ValidationResult> results = new ArrayList<>();
final String endpointName = validationContext.getProperty(ENDPOINT).getValue();
if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
if (!validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet()) {
results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName())
.valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build());
}
}
return results;
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
// if any property is modified, the results are no longer valid. Destroy all messages in teh queue.
messageQueue.clear();
// if any property is modified, the results are no longer valid. Destroy all messages in teh queue.
messageQueue.clear();
}
@OnScheduled
public void onScheduled(final ProcessContext context) throws MalformedURLException {
messageQueue = new LinkedBlockingQueue<>(100000);
final String endpointName = context.getProperty(ENDPOINT).getValue();
final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
context.getProperty(CONSUMER_SECRET).getValue(),
context.getProperty(ACCESS_TOKEN).getValue(),
context.getProperty(ACCESS_TOKEN_SECRET).getValue());
messageQueue = new LinkedBlockingQueue<>(100000);
final ClientBuilder clientBuilder = new ClientBuilder();
clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]")
.authentication(oauth)
.eventMessageQueue(eventQueue)
.processor(new StringDelimitedProcessor(messageQueue));
final String endpointName = context.getProperty(ENDPOINT).getValue();
final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(),
context.getProperty(CONSUMER_SECRET).getValue(),
context.getProperty(ACCESS_TOKEN).getValue(),
context.getProperty(ACCESS_TOKEN_SECRET).getValue());
final String languageString = context.getProperty(LANGUAGES).getValue();
final List<String> languages;
if ( languageString == null ) {
languages = null;
} else {
languages = new ArrayList<>();
for ( final String language : context.getProperty(LANGUAGES).getValue().split(",") ) {
languages.add(language.trim());
}
}
final String host;
final StreamingEndpoint streamingEndpoint;
if ( ENDPOINT_SAMPLE.getValue().equals(endpointName) ) {
host = Constants.STREAM_HOST;
final StatusesSampleEndpoint sse = new StatusesSampleEndpoint();
streamingEndpoint = sse;
if ( languages != null ) {
sse.languages(languages);
}
} else if ( ENDPOINT_FIREHOSE.getValue().equals(endpointName) ) {
host = Constants.STREAM_HOST;
final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint();
streamingEndpoint = firehoseEndpoint;
if ( languages != null ) {
firehoseEndpoint.languages(languages);
}
} else if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) {
host = Constants.STREAM_HOST;
final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint();
final String followingString = context.getProperty(FOLLOWING).getValue();
final List<Long> followingIds;
if ( followingString == null ) {
followingIds = Collections.emptyList();
} else {
followingIds = new ArrayList<>();
for ( final String split : followingString.split(",") ) {
final Long id = Long.parseLong(split.trim());
followingIds.add(id);
}
}
final String termString = context.getProperty(TERMS).getValue();
final List<String> terms;
if ( termString == null ) {
terms = Collections.emptyList();
} else {
terms = new ArrayList<>();
for ( final String split : termString.split(",") ) {
terms.add(split.trim());
}
}
if ( !terms.isEmpty() ) {
filterEndpoint.trackTerms(terms);
}
if ( !followingIds.isEmpty() ) {
filterEndpoint.followings(followingIds);
}
if ( languages != null ) {
filterEndpoint.languages(languages);
}
streamingEndpoint = filterEndpoint;
} else {
throw new AssertionError("Endpoint was invalid value: " + endpointName);
}
final ClientBuilder clientBuilder = new ClientBuilder();
clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]")
.authentication(oauth)
.eventMessageQueue(eventQueue)
.processor(new StringDelimitedProcessor(messageQueue));
clientBuilder.hosts(host).endpoint(streamingEndpoint);
client = clientBuilder.build();
client.connect();
final String languageString = context.getProperty(LANGUAGES).getValue();
final List<String> languages;
if (languageString == null) {
languages = null;
} else {
languages = new ArrayList<>();
for (final String language : context.getProperty(LANGUAGES).getValue().split(",")) {
languages.add(language.trim());
}
}
final String host;
final StreamingEndpoint streamingEndpoint;
if (ENDPOINT_SAMPLE.getValue().equals(endpointName)) {
host = Constants.STREAM_HOST;
final StatusesSampleEndpoint sse = new StatusesSampleEndpoint();
streamingEndpoint = sse;
if (languages != null) {
sse.languages(languages);
}
} else if (ENDPOINT_FIREHOSE.getValue().equals(endpointName)) {
host = Constants.STREAM_HOST;
final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint();
streamingEndpoint = firehoseEndpoint;
if (languages != null) {
firehoseEndpoint.languages(languages);
}
} else if (ENDPOINT_FILTER.getValue().equals(endpointName)) {
host = Constants.STREAM_HOST;
final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint();
final String followingString = context.getProperty(FOLLOWING).getValue();
final List<Long> followingIds;
if (followingString == null) {
followingIds = Collections.emptyList();
} else {
followingIds = new ArrayList<>();
for (final String split : followingString.split(",")) {
final Long id = Long.parseLong(split.trim());
followingIds.add(id);
}
}
final String termString = context.getProperty(TERMS).getValue();
final List<String> terms;
if (termString == null) {
terms = Collections.emptyList();
} else {
terms = new ArrayList<>();
for (final String split : termString.split(",")) {
terms.add(split.trim());
}
}
if (!terms.isEmpty()) {
filterEndpoint.trackTerms(terms);
}
if (!followingIds.isEmpty()) {
filterEndpoint.followings(followingIds);
}
if (languages != null) {
filterEndpoint.languages(languages);
}
streamingEndpoint = filterEndpoint;
} else {
throw new AssertionError("Endpoint was invalid value: " + endpointName);
}
clientBuilder.hosts(host).endpoint(streamingEndpoint);
client = clientBuilder.build();
client.connect();
}
@OnStopped
public void shutdownClient() {
if ( client != null ) {
client.stop();
}
if (client != null) {
client.stop();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final Event event = eventQueue.poll();
if ( event != null ) {
switch (event.getEventType()) {
case STOPPED_BY_ERROR:
getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[] {event.getEventType(), event.getMessage(), event.getUnderlyingException()});
break;
case CONNECTION_ERROR:
case HTTP_ERROR:
getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[] {event.getEventType(), event.getMessage()});
client.reconnect();
break;
default:
break;
}
}
final String tweet = messageQueue.poll();
if ( tweet == null ) {
context.yield();
return;
}
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(tweet.getBytes(StandardCharsets.UTF_8));
}
});
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json");
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString());
final Event event = eventQueue.poll();
if (event != null) {
switch (event.getEventType()) {
case STOPPED_BY_ERROR:
getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[]{event.getEventType(), event.getMessage(), event.getUnderlyingException()});
break;
case CONNECTION_ERROR:
case HTTP_ERROR:
getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[]{event.getEventType(), event.getMessage()});
client.reconnect();
break;
default:
break;
}
}
final String tweet = messageQueue.poll();
if (tweet == null) {
context.yield();
return;
}
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(tweet.getBytes(StandardCharsets.UTF_8));
}
});
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json");
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString());
}
private static class FollowingValidator implements Validator {
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final String[] splits = input.split(",");
for ( final String split : splits ) {
if ( !NUMBER_PATTERN.matcher(split.trim()).matches() ) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final String[] splits = input.split(",");
for (final String split : splits) {
if (!NUMBER_PATTERN.matcher(split.trim()).matches()) {
return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).valid(true).build();
}
}
}