NIFI-3744 - PutHiveStreaming cleanup null fixes

This closes #1698.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This commit is contained in:
Bryan Rosander 2017-04-26 10:56:40 -04:00 committed by Pierre Villard
parent 5cacc52cfc
commit 97461657b1
2 changed files with 13 additions and 11 deletions

View File

@ -262,13 +262,12 @@ public class PutHiveStreaming extends AbstractProcessor {
protected volatile UserGroupInformation ugi;
protected volatile HiveConf hiveConfig;
protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
protected HiveOptions options;
protected ExecutorService callTimeoutPool;
protected transient Timer heartBeatTimer;
protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
protected Map<HiveEndPoint, HiveWriter> allWriters;
protected Map<HiveEndPoint, HiveWriter> allWriters = Collections.emptyMap();
@Override
@ -662,17 +661,20 @@ public class PutHiveStreaming extends AbstractProcessor {
}
}
}
allWriters = Collections.emptyMap();
callTimeoutPool.shutdown();
try {
while (!callTimeoutPool.isTerminated()) {
callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS);
if (callTimeoutPool != null) {
callTimeoutPool.shutdown();
try {
while (!callTimeoutPool.isTerminated()) {
callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS);
}
} catch (Throwable t) {
log.warn("shutdown interrupted on " + callTimeoutPool, t);
}
} catch (Throwable t) {
log.warn("shutdown interrupted on " + callTimeoutPool, t);
callTimeoutPool = null;
}
callTimeoutPool = null;
ugi = null;
hiveConfigurator.stopRenewer();
}

View File

@ -521,7 +521,7 @@ public class TestPutHiveStreaming {
@Test
public void cleanup() throws Exception {
processor.cleanup();
}
@Test