diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 7057dff939..e06befb504 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -323,9 +323,6 @@ public class GetKafka extends AbstractProcessor { @OnScheduled public void schedule(ProcessContext context) { this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; - if (this.executor == null || this.executor.isShutdown()) { - this.executor = Executors.newCachedThreadPool(); - } } @Override @@ -335,6 +332,9 @@ public class GetKafka extends AbstractProcessor { * of onTrigger. Will be reset to 'false' in the event of exception */ synchronized (this.consumerStreamsReady) { + if (this.executor == null || this.executor.isShutdown()) { + this.executor = Executors.newCachedThreadPool(); + } if (!this.consumerStreamsReady.get()) { Future f = this.executor.submit(new Callable() { @Override