From 97461657b1fe3193e028d2a085734415563a26d8 Mon Sep 17 00:00:00 2001 From: Bryan Rosander Date: Wed, 26 Apr 2017 10:56:40 -0400 Subject: [PATCH] NIFI-3744 - PutHiveStreaming cleanup null fixes This closes #1698. Signed-off-by: Pierre Villard --- .../processors/hive/PutHiveStreaming.java | 22 ++++++++++--------- .../processors/hive/TestPutHiveStreaming.java | 2 +- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index d81108dbc9..1494595cc9 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -262,13 +262,12 @@ public class PutHiveStreaming extends AbstractProcessor { protected volatile UserGroupInformation ugi; protected volatile HiveConf hiveConfig; - protected final AtomicBoolean isInitialized = new AtomicBoolean(false); + protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false); protected HiveOptions options; protected ExecutorService callTimeoutPool; protected transient Timer heartBeatTimer; - protected AtomicBoolean sendHeartBeat = new AtomicBoolean(false); - protected Map allWriters; + protected Map allWriters = Collections.emptyMap(); @Override @@ -662,17 +661,20 @@ public class PutHiveStreaming extends AbstractProcessor { } } } + allWriters = Collections.emptyMap(); - callTimeoutPool.shutdown(); - try { - while (!callTimeoutPool.isTerminated()) { - callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS); + if (callTimeoutPool != null) { + callTimeoutPool.shutdown(); + try { + while (!callTimeoutPool.isTerminated()) { + callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS); + } + } catch (Throwable t) { + log.warn("shutdown interrupted on " + callTimeoutPool, t); } - } catch (Throwable t) { - log.warn("shutdown interrupted on " + callTimeoutPool, t); + callTimeoutPool = null; } - callTimeoutPool = null; ugi = null; hiveConfigurator.stopRenewer(); } diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java index 26e832da70..61b53046b1 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java @@ -521,7 +521,7 @@ public class TestPutHiveStreaming { @Test public void cleanup() throws Exception { - + processor.cleanup(); } @Test