NIFI-11092 Fixed ConsumeTwitter handling on Primary Node changes

This closes #6901

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Emilio Setiadarma 2023-01-26 15:22:19 -08:00 committed by exceptionfactory
parent 28bfd1252f
commit 0b61a6226c
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 43 additions and 10 deletions

View File

@ -26,6 +26,8 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
@ -49,6 +51,7 @@ import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@PrimaryNodeOnly
@ -255,6 +258,8 @@ public class ConsumeTwitter extends AbstractProcessor {
private volatile BlockingQueue<String> messageQueue;
private final AtomicBoolean streamStarted = new AtomicBoolean(false);
@Override
protected void init(ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
@ -296,13 +301,13 @@ public class ConsumeTwitter extends AbstractProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
messageQueue = new LinkedBlockingQueue<>(context.getProperty(QUEUE_SIZE).asInteger());
tweetStreamService = new TweetStreamService(context, messageQueue, getLogger());
tweetStreamService.start();
streamStarted.set(false);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
startTweetStreamService(context);
final String firstTweet = messageQueue.poll();
if (firstTweet == null) {
context.yield();
@ -338,15 +343,41 @@ public class ConsumeTwitter extends AbstractProcessor {
session.getProvenanceReporter().receive(flowFile, transitUri);
}
@OnPrimaryNodeStateChange
public void onPrimaryNodeStateChange(final PrimaryNodeState newState) {
if (newState == PrimaryNodeState.PRIMARY_NODE_REVOKED) {
stopTweetStreamService();
}
}
@OnStopped
public void onStopped() {
if (tweetStreamService != null) {
tweetStreamService.stop();
}
tweetStreamService = null;
stopTweetStreamService();
emptyQueue();
}
private void startTweetStreamService(final ProcessContext context) {
if (streamStarted.compareAndSet(false, true)) {
tweetStreamService = new TweetStreamService(context, messageQueue, getLogger());
tweetStreamService.start();
}
}
private void stopTweetStreamService() {
if (streamStarted.compareAndSet(true, false)) {
if (tweetStreamService != null) {
tweetStreamService.stop();
}
tweetStreamService = null;
if (!messageQueue.isEmpty()) {
getLogger().warn("Stopped consuming stream: unprocessed messages [{}]", messageQueue.size());
}
}
}
private void emptyQueue() {
while (!messageQueue.isEmpty()) {
messageQueue.poll();

View File

@ -44,7 +44,8 @@ public class TweetStreamService {
private final BlockingQueue<String> queue;
private final ComponentLog logger;
private final ScheduledExecutorService executorService;
private ScheduledExecutorService executorService;
private final ThreadFactory threadFactory;
private final Set<String> tweetFields;
private final Set<String> userFields;
@ -107,8 +108,7 @@ public class TweetStreamService {
final String basePath = context.getProperty(ConsumeTwitter.BASE_PATH).getValue();
api.getApiClient().setBasePath(basePath);
final ThreadFactory threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build();
this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
threadFactory = new BasicThreadFactory.Builder().namingPattern(ConsumeTwitter.class.getSimpleName()).build();
}
public String getTransitUri(final String endpoint) {
@ -128,6 +128,7 @@ public class TweetStreamService {
* to run until {@code stop} is called.
*/
public void start() {
this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService.execute(new TweetStreamStarter());
}
@ -145,6 +146,7 @@ public class TweetStreamService {
}
executorService.shutdownNow();
executorService = null;
}
private Long calculateBackoffDelay() {