From 269e25e9934ca54277f016bdb5a1589bbe021a76 Mon Sep 17 00:00:00 2001 From: joewitt Date: Sat, 25 Apr 2015 08:52:47 -0400 Subject: [PATCH] NIFI-271 --- .../nifi-twitter-processors/pom.xml | 10 +- .../nifi/processors/twitter/GetTwitter.java | 444 +++++++++--------- 2 files changed, 229 insertions(+), 225 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml index 45af0cec5f..4768dbc2f1 100644 --- a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/pom.xml @@ -35,11 +35,11 @@ nifi-processor-utils - - com.twitter - hbc-twitter4j - 2.2.0 - + + com.twitter + hbc-twitter4j + 2.2.0 + org.apache.nifi diff --git a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java index 45b1ae1a13..a0568676c2 100644 --- a/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java +++ b/nifi/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/GetTwitter.java @@ -69,82 +69,84 @@ import com.twitter.hbc.httpclient.auth.OAuth1; @SupportsBatching @Tags({"twitter", "tweets", "social media", "status", "json"}) @CapabilityDescription("Pulls status changes from Twitter's streaming API") -@WritesAttribute(attribute="mime.type", description="Sets mime type to application/json") +@WritesAttribute(attribute = "mime.type", description = "Sets mime type to application/json") public class GetTwitter extends AbstractProcessor { - static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'"); - static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets"); - static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs"); - - public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder() - .name("Twitter Endpoint") - .description("Specifies which endpoint data should be pulled from") - .required(true) - .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER) - .defaultValue(ENDPOINT_SAMPLE.getValue()) - .build(); + static final AllowableValue ENDPOINT_SAMPLE = new AllowableValue("Sample Endpoint", "Sample Endpoint", "The endpoint that provides public data, aka a 'garden hose'"); + static final AllowableValue ENDPOINT_FIREHOSE = new AllowableValue("Firehose Endpoint", "Firehose Endpoint", "The endpoint that provides access to all tweets"); + static final AllowableValue ENDPOINT_FILTER = new AllowableValue("Filter Endpoint", "Filter Endpoint", "Endpoint that allows the stream to be filtered by specific terms or User IDs"); + + public static final PropertyDescriptor ENDPOINT = new PropertyDescriptor.Builder() + .name("Twitter Endpoint") + .description("Specifies which endpoint data should be pulled from") + .required(true) + .allowableValues(ENDPOINT_SAMPLE, ENDPOINT_FIREHOSE, ENDPOINT_FILTER) + .defaultValue(ENDPOINT_SAMPLE.getValue()) + .build(); public static final PropertyDescriptor CONSUMER_KEY = new PropertyDescriptor.Builder() - .name("Consumer Key") - .description("The Consumer Key provided by Twitter") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Consumer Key") + .description("The Consumer Key provided by Twitter") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor CONSUMER_SECRET = new PropertyDescriptor.Builder() - .name("Consumer Secret") - .description("The Consumer Secret provided by Twitter") - .required(true) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Consumer Secret") + .description("The Consumer Secret provided by Twitter") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor ACCESS_TOKEN = new PropertyDescriptor.Builder() - .name("Access Token") - .description("The Acces Token provided by Twitter") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Access Token") + .description("The Acces Token provided by Twitter") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor ACCESS_TOKEN_SECRET = new PropertyDescriptor.Builder() - .name("Access Token Secret") - .description("The Access Token Secret provided by Twitter") - .required(true) - .sensitive(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Access Token Secret") + .description("The Access Token Secret provided by Twitter") + .required(true) + .sensitive(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor LANGUAGES = new PropertyDescriptor.Builder() - .name("Languages") - .description("A comma-separated list of languages for which tweets should be fetched") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Languages") + .description("A comma-separated list of languages for which tweets should be fetched") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor FOLLOWING = new PropertyDescriptor.Builder() - .name("IDs to Follow") - .description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.") - .required(false) - .addValidator(new FollowingValidator()) - .build(); + .name("IDs to Follow") + .description("A comma-separated list of Twitter User ID's to follow. Ignored unless Endpoint is set to 'Filter Endpoint'.") + .required(false) + .addValidator(new FollowingValidator()) + .build(); public static final PropertyDescriptor TERMS = new PropertyDescriptor.Builder() - .name("Terms to Filter On") - .description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'. The filter works such that if any term matches, the status update will be retrieved; multiple terms separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that have either 'hello' or both 'it' AND 'was'") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - + .name("Terms to Filter On") + .description("A comma-separated list of terms to filter on. Ignored unless Endpoint is set to 'Filter Endpoint'." + + " The filter works such that if any term matches, the status update will be retrieved; multiple terms" + + " separated by a space function as an 'AND'. I.e., 'it was, hello' will retrieve status updates that" + + " have either 'hello' or both 'it' AND 'was'") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All status updates will be routed to this relationship") - .build(); + .name("success") + .description("All status updates will be routed to this relationship") + .build(); private List descriptors; private Set relationships; - private final BlockingQueue eventQueue = new LinkedBlockingQueue(1000); - + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(1000); + private volatile Client client; private volatile BlockingQueue messageQueue; @Override protected void init(final ProcessorInitializationContext context) { - final List descriptors = new ArrayList(); + final List descriptors = new ArrayList<>(); descriptors.add(ENDPOINT); descriptors.add(CONSUMER_KEY); descriptors.add(CONSUMER_SECRET); @@ -155,7 +157,7 @@ public class GetTwitter extends AbstractProcessor { descriptors.add(FOLLOWING); this.descriptors = Collections.unmodifiableList(descriptors); - final Set relationships = new HashSet(); + final Set relationships = new HashSet<>(); relationships.add(REL_SUCCESS); this.relationships = Collections.unmodifiableSet(relationships); } @@ -169,192 +171,194 @@ public class GetTwitter extends AbstractProcessor { public final List getSupportedPropertyDescriptors() { return descriptors; } - + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query") - .required(false) - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("Adds a query parameter with name '" + propertyDescriptorName + "' to the Twitter query") + .required(false) + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); } - + @Override protected Collection customValidate(final ValidationContext validationContext) { - final List results = new ArrayList<>(); - final String endpointName = validationContext.getProperty(ENDPOINT).getValue(); - - if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) { - if ( !validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet() ) { - results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()).valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build()); - } - } - - return results; + final List results = new ArrayList<>(); + final String endpointName = validationContext.getProperty(ENDPOINT).getValue(); + + if (ENDPOINT_FILTER.getValue().equals(endpointName)) { + if (!validationContext.getProperty(TERMS).isSet() && !validationContext.getProperty(FOLLOWING).isSet()) { + results.add(new ValidationResult.Builder().input("").subject(FOLLOWING.getName()) + .valid(false).explanation("When using the 'Filter Endpoint', at least one of '" + TERMS.getName() + "' or '" + FOLLOWING.getName() + "' must be set").build()); + } + } + + return results; } - + @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - // if any property is modified, the results are no longer valid. Destroy all messages in teh queue. - messageQueue.clear(); + // if any property is modified, the results are no longer valid. Destroy all messages in teh queue. + messageQueue.clear(); } @OnScheduled public void onScheduled(final ProcessContext context) throws MalformedURLException { - messageQueue = new LinkedBlockingQueue<>(100000); - - final String endpointName = context.getProperty(ENDPOINT).getValue(); - final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(), - context.getProperty(CONSUMER_SECRET).getValue(), - context.getProperty(ACCESS_TOKEN).getValue(), - context.getProperty(ACCESS_TOKEN_SECRET).getValue()); + messageQueue = new LinkedBlockingQueue<>(100000); - final ClientBuilder clientBuilder = new ClientBuilder(); - clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]") - .authentication(oauth) - .eventMessageQueue(eventQueue) - .processor(new StringDelimitedProcessor(messageQueue)); + final String endpointName = context.getProperty(ENDPOINT).getValue(); + final Authentication oauth = new OAuth1(context.getProperty(CONSUMER_KEY).getValue(), + context.getProperty(CONSUMER_SECRET).getValue(), + context.getProperty(ACCESS_TOKEN).getValue(), + context.getProperty(ACCESS_TOKEN_SECRET).getValue()); - final String languageString = context.getProperty(LANGUAGES).getValue(); - final List languages; - if ( languageString == null ) { - languages = null; - } else { - languages = new ArrayList<>(); - for ( final String language : context.getProperty(LANGUAGES).getValue().split(",") ) { - languages.add(language.trim()); - } - } - - final String host; - final StreamingEndpoint streamingEndpoint; - if ( ENDPOINT_SAMPLE.getValue().equals(endpointName) ) { - host = Constants.STREAM_HOST; - final StatusesSampleEndpoint sse = new StatusesSampleEndpoint(); - streamingEndpoint = sse; - if ( languages != null ) { - sse.languages(languages); - } - } else if ( ENDPOINT_FIREHOSE.getValue().equals(endpointName) ) { - host = Constants.STREAM_HOST; - final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint(); - streamingEndpoint = firehoseEndpoint; - if ( languages != null ) { - firehoseEndpoint.languages(languages); - } - } else if ( ENDPOINT_FILTER.getValue().equals(endpointName) ) { - host = Constants.STREAM_HOST; - final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint(); - - final String followingString = context.getProperty(FOLLOWING).getValue(); - final List followingIds; - if ( followingString == null ) { - followingIds = Collections.emptyList(); - } else { - followingIds = new ArrayList<>(); - - for ( final String split : followingString.split(",") ) { - final Long id = Long.parseLong(split.trim()); - followingIds.add(id); - } - } - - final String termString = context.getProperty(TERMS).getValue(); - final List terms; - if ( termString == null ) { - terms = Collections.emptyList(); - } else { - terms = new ArrayList<>(); - for ( final String split : termString.split(",") ) { - terms.add(split.trim()); - } - } - - if ( !terms.isEmpty() ) { - filterEndpoint.trackTerms(terms); - } - - if ( !followingIds.isEmpty() ) { - filterEndpoint.followings(followingIds); - } - - if ( languages != null ) { - filterEndpoint.languages(languages); - } - streamingEndpoint = filterEndpoint; - } else { - throw new AssertionError("Endpoint was invalid value: " + endpointName); - } + final ClientBuilder clientBuilder = new ClientBuilder(); + clientBuilder.name("GetTwitter[id=" + getIdentifier() + "]") + .authentication(oauth) + .eventMessageQueue(eventQueue) + .processor(new StringDelimitedProcessor(messageQueue)); - clientBuilder.hosts(host).endpoint(streamingEndpoint); - client = clientBuilder.build(); - client.connect(); + final String languageString = context.getProperty(LANGUAGES).getValue(); + final List languages; + if (languageString == null) { + languages = null; + } else { + languages = new ArrayList<>(); + for (final String language : context.getProperty(LANGUAGES).getValue().split(",")) { + languages.add(language.trim()); + } + } + + final String host; + final StreamingEndpoint streamingEndpoint; + if (ENDPOINT_SAMPLE.getValue().equals(endpointName)) { + host = Constants.STREAM_HOST; + final StatusesSampleEndpoint sse = new StatusesSampleEndpoint(); + streamingEndpoint = sse; + if (languages != null) { + sse.languages(languages); + } + } else if (ENDPOINT_FIREHOSE.getValue().equals(endpointName)) { + host = Constants.STREAM_HOST; + final StatusesFirehoseEndpoint firehoseEndpoint = new StatusesFirehoseEndpoint(); + streamingEndpoint = firehoseEndpoint; + if (languages != null) { + firehoseEndpoint.languages(languages); + } + } else if (ENDPOINT_FILTER.getValue().equals(endpointName)) { + host = Constants.STREAM_HOST; + final StatusesFilterEndpoint filterEndpoint = new StatusesFilterEndpoint(); + + final String followingString = context.getProperty(FOLLOWING).getValue(); + final List followingIds; + if (followingString == null) { + followingIds = Collections.emptyList(); + } else { + followingIds = new ArrayList<>(); + + for (final String split : followingString.split(",")) { + final Long id = Long.parseLong(split.trim()); + followingIds.add(id); + } + } + + final String termString = context.getProperty(TERMS).getValue(); + final List terms; + if (termString == null) { + terms = Collections.emptyList(); + } else { + terms = new ArrayList<>(); + for (final String split : termString.split(",")) { + terms.add(split.trim()); + } + } + + if (!terms.isEmpty()) { + filterEndpoint.trackTerms(terms); + } + + if (!followingIds.isEmpty()) { + filterEndpoint.followings(followingIds); + } + + if (languages != null) { + filterEndpoint.languages(languages); + } + streamingEndpoint = filterEndpoint; + } else { + throw new AssertionError("Endpoint was invalid value: " + endpointName); + } + + clientBuilder.hosts(host).endpoint(streamingEndpoint); + client = clientBuilder.build(); + client.connect(); } @OnStopped public void shutdownClient() { - if ( client != null ) { - client.stop(); - } + if (client != null) { + client.stop(); + } } - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final Event event = eventQueue.poll(); - if ( event != null ) { - switch (event.getEventType()) { - case STOPPED_BY_ERROR: - getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[] {event.getEventType(), event.getMessage(), event.getUnderlyingException()}); - break; - case CONNECTION_ERROR: - case HTTP_ERROR: - getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[] {event.getEventType(), event.getMessage()}); - client.reconnect(); - break; - default: - break; - } - } - - final String tweet = messageQueue.poll(); - if ( tweet == null ) { - context.yield(); - return; - } - - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(tweet.getBytes(StandardCharsets.UTF_8)); - } - }); - - final Map attributes = new HashMap<>(); - attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); - attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json"); - flowFile = session.putAllAttributes(flowFile, attributes); - - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString()); + final Event event = eventQueue.poll(); + if (event != null) { + switch (event.getEventType()) { + case STOPPED_BY_ERROR: + getLogger().error("Received error {}: {} due to {}. Will not attempt to reconnect", new Object[]{event.getEventType(), event.getMessage(), event.getUnderlyingException()}); + break; + case CONNECTION_ERROR: + case HTTP_ERROR: + getLogger().error("Received error {}: {}. Will attempt to reconnect", new Object[]{event.getEventType(), event.getMessage()}); + client.reconnect(); + break; + default: + break; + } + } + + final String tweet = messageQueue.poll(); + if (tweet == null) { + context.yield(); + return; + } + + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(tweet.getBytes(StandardCharsets.UTF_8)); + } + }); + + final Map attributes = new HashMap<>(); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + attributes.put(CoreAttributes.FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key()) + ".json"); + flowFile = session.putAllAttributes(flowFile, attributes); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, Constants.STREAM_HOST + client.getEndpoint().getURI().toString()); } private static class FollowingValidator implements Validator { - private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); - - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final String[] splits = input.split(","); - for ( final String split : splits ) { - if ( !NUMBER_PATTERN.matcher(split.trim()).matches() ) { - return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build(); - } - } - - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } - + + private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String[] splits = input.split(","); + for (final String split : splits) { + if (!NUMBER_PATTERN.matcher(split.trim()).matches()) { + return new ValidationResult.Builder().input(input).subject(subject).valid(false).explanation("Must be comma-separted list of User ID's").build(); + } + } + + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + } }