diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java index 041fd80dba..2517456d74 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/ConsumeTwitter.java @@ -26,6 +26,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; +import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; @@ -49,6 +51,7 @@ import java.util.HashMap; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @PrimaryNodeOnly @@ -255,6 +258,8 @@ public class ConsumeTwitter extends AbstractProcessor { private volatile BlockingQueue messageQueue; + private final AtomicBoolean streamStarted = new AtomicBoolean(false); + @Override protected void init(ProcessorInitializationContext context) { final List descriptors = new ArrayList<>(); @@ -296,13 +301,13 @@ public class ConsumeTwitter extends AbstractProcessor { @OnScheduled public void onScheduled(final ProcessContext context) { messageQueue = new LinkedBlockingQueue<>(context.getProperty(QUEUE_SIZE).asInteger()); - - tweetStreamService = new TweetStreamService(context, messageQueue, getLogger()); - tweetStreamService.start(); + streamStarted.set(false); } @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + startTweetStreamService(context); + final String firstTweet = messageQueue.poll(); if (firstTweet == null) { context.yield(); @@ -338,15 +343,41 @@ public class ConsumeTwitter extends AbstractProcessor { session.getProvenanceReporter().receive(flowFile, transitUri); } + @OnPrimaryNodeStateChange + public void onPrimaryNodeStateChange(final PrimaryNodeState newState) { + if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) { + stopTweetStreamService(); + } + } + @OnStopped public void onStopped() { - if (tweetStreamService != null) { - tweetStreamService.stop(); - } - tweetStreamService = null; + stopTweetStreamService(); emptyQueue(); } + private void startTweetStreamService(final ProcessContext context) { + if (streamStarted.compareAndSet(false, true)) { + tweetStreamService = new TweetStreamService(context, messageQueue, getLogger()); + tweetStreamService.start(); + } + + } + + private void stopTweetStreamService() { + if (streamStarted.compareAndSet(true, false)) { + if (tweetStreamService != null) { + tweetStreamService.stop(); + } + tweetStreamService = null; + + if (!messageQueue.isEmpty()) { + getLogger().warn("Stopped consuming stream: unprocessed messages [{}]", messageQueue.size()); + } + } + } + + private void emptyQueue() { while (!messageQueue.isEmpty()) { messageQueue.poll(); diff --git a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java index 027839b5c7..f9cd8f5f1a 100644 --- a/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java +++ b/nifi-nar-bundles/nifi-social-media-bundle/nifi-twitter-processors/src/main/java/org/apache/nifi/processors/twitter/TweetStreamService.java @@ -44,7 +44,8 @@ public class TweetStreamService { private final BlockingQueue queue; private final ComponentLog logger; - private final ScheduledExecutorService executorService; + private ScheduledExecutorService executorService; + private final ThreadFactory threadFactory; private final Set tweetFields; private final Set userFields; @@ -107,8 +108,7 @@ public class TweetStreamService { final String basePath = context.getProperty(ConsumeTwitter.BASE_PATH).getValue(); api.getApiClient().setBasePath(basePath); - final ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build(); - this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); + threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build(); } public String getTransitUri(final String endpoint) { @@ -128,6 +128,7 @@ public class TweetStreamService { * to run until {@code stop} is called. */ public void start() { + this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory); executorService.execute(new TweetStreamStarter()); } @@ -145,6 +146,7 @@ public class TweetStreamService { } executorService.shutdownNow(); + executorService = null; } private Long calculateBackoffDelay() {