From a68f87f96efdfb3c13d2032410aae38857f51183 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 22 Mar 2016 11:19:39 -0400 Subject: [PATCH] NIFI-1665 This closes #296. fixed GetKafka to reset consumer in case of timeout NIFI-1665 polishing Signed-off-by: joewitt --- .../java/org/apache/nifi/processors/kafka/GetKafka.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 7057dff939..e06befb504 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -323,9 +323,6 @@ public class GetKafka extends AbstractProcessor { @OnScheduled public void schedule(ProcessContext context) { this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; - if (this.executor == null || this.executor.isShutdown()) { - this.executor = Executors.newCachedThreadPool(); - } } @Override @@ -335,6 +332,9 @@ public class GetKafka extends AbstractProcessor { * of onTrigger. Will be reset to 'false' in the event of exception */ synchronized (this.consumerStreamsReady) { + if (this.executor == null || this.executor.isShutdown()) { + this.executor = Executors.newCachedThreadPool(); + } if (!this.consumerStreamsReady.get()) { Future f = this.executor.submit(new Callable() { @Override