NIFI-1665 This closes #296. fixed GetKafka to reset consumer in case of timeout

NIFI-1665 polishing

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Oleg Zhurakousky 2016-03-22 11:19:39 -04:00 committed by joewitt
parent f120952ab7
commit a68f87f96e
1 changed files with 3 additions and 3 deletions

View File

@ -323,9 +323,6 @@ public class GetKafka extends AbstractProcessor {
@OnScheduled @OnScheduled
public void schedule(ProcessContext context) { public void schedule(ProcessContext context) {
this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2;
if (this.executor == null || this.executor.isShutdown()) {
this.executor = Executors.newCachedThreadPool();
}
} }
@Override @Override
@ -335,6 +332,9 @@ public class GetKafka extends AbstractProcessor {
* of onTrigger. Will be reset to 'false' in the event of exception * of onTrigger. Will be reset to 'false' in the event of exception
*/ */
synchronized (this.consumerStreamsReady) { synchronized (this.consumerStreamsReady) {
if (this.executor == null || this.executor.isShutdown()) {
this.executor = Executors.newCachedThreadPool();
}
if (!this.consumerStreamsReady.get()) { if (!this.consumerStreamsReady.get()) {
Future<Void> f = this.executor.submit(new Callable<Void>() { Future<Void> f = this.executor.submit(new Callable<Void>() {
@Override @Override