diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java index 794b26884f..74f3357fdb 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/PutHiveStreaming.java @@ -26,7 +26,7 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.streaming.ConnectionError; import org.apache.hive.hcatalog.streaming.HiveEndPoint; @@ -258,6 +258,7 @@ public class PutHiveStreaming extends AbstractProcessor { protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator(); protected volatile UserGroupInformation ugi; + protected volatile HiveConf hiveConfig; protected final AtomicBoolean isInitialized = new AtomicBoolean(false); @@ -318,7 +319,7 @@ public class PutHiveStreaming extends AbstractProcessor { final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger(); final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger(); final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue(); - final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); + hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles); // add any dynamic properties to the Hive configuration for (final Map.Entry entry : context.getProperties().entrySet()) { @@ -669,6 +670,7 @@ public class PutHiveStreaming extends AbstractProcessor { } callTimeoutPool = null; + hiveConfigurator.stopRenewer(); } private void setupHeartBeatTimer() { @@ -837,7 +839,7 @@ public class PutHiveStreaming extends AbstractProcessor { protected HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options) throws HiveWriter.ConnectFailure, InterruptedException { - return HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options); + return HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options, hiveConfig); } protected KerberosProperties getKerberosProperties() { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java index ad2217768e..f6f65fca85 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveConfigurator.java @@ -62,8 +62,8 @@ public class HiveConfigurator { return problems; } - public Configuration getConfigurationFromFiles(final String configFiles) { - final Configuration hiveConfig = new HiveConf(); + public HiveConf getConfigurationFromFiles(final String configFiles) { + final HiveConf hiveConfig = new HiveConf(); if (StringUtils.isNotBlank(configFiles)) { for (final String configFile : configFiles.split(",")) { hiveConfig.addResource(new Path(configFile.trim())); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java index ca9ceeb3be..3e375f9121 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveUtils.java @@ -18,6 +18,7 @@ package org.apache.nifi.util.hive; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.streaming.ConnectionError; @@ -39,10 +40,10 @@ public class HiveUtils { return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals); } - public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options) + public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options, HiveConf hiveConf) throws HiveWriter.ConnectFailure, InterruptedException { return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(), - options.getCallTimeOut(), callTimeoutPool, ugi); + options.getCallTimeOut(), callTimeoutPool, ugi, hiveConf); } public static void logAllHiveEndPoints(Map allWriters) { diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java index 387e53f442..f7bfe9155e 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/util/hive/HiveWriter.java @@ -19,6 +19,7 @@ package org.apache.nifi.util.hive; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -26,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.streaming.HiveEndPoint; @@ -54,23 +56,17 @@ public class HiveWriter { private TransactionBatch txnBatch; private long lastUsed; // time of last flush on this writer protected boolean closed; // flag indicating HiveWriter was closed - private boolean autoCreatePartitions; - private UserGroupInformation ugi; private int totalRecords = 0; - public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch, - boolean autoCreatePartitions, long callTimeout, - ExecutorService callTimeoutPool, UserGroupInformation ugi) + public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveConf hiveConf) throws InterruptedException, ConnectFailure { try { - this.autoCreatePartitions = autoCreatePartitions; this.callTimeout = callTimeout; this.callTimeoutPool = callTimeoutPool; this.endPoint = endPoint; - this.ugi = ugi; - this.connection = newConnection(ugi); + this.connection = newConnection(endPoint, autoCreatePartitions, hiveConf, ugi); this.txnsPerBatch = txnsPerBatch; - this.recordWriter = getRecordWriter(endPoint); + this.recordWriter = getRecordWriter(endPoint, ugi, hiveConf); this.txnBatch = nextTxnBatch(recordWriter); this.closed = false; this.lastUsed = System.currentTimeMillis(); @@ -81,15 +77,17 @@ public class HiveWriter { } } - protected RecordWriter getRecordWriter(HiveEndPoint endPoint) throws StreamingException { - return new StrictJsonWriter(endPoint); + protected RecordWriter getRecordWriter(HiveEndPoint endPoint, UserGroupInformation ugi, HiveConf hiveConf) throws StreamingException, IOException, InterruptedException { + if (ugi == null) { + return new StrictJsonWriter(endPoint, hiveConf); + } else { + return ugi.doAs((PrivilegedExceptionAction) () -> new StrictJsonWriter(endPoint, hiveConf)); + } } @Override public String toString() { - return "{ " - + "endPoint = " + endPoint.toString() - + ", TransactionBatch = " + txnBatch.toString() + " }"; + return "{ endPoint = " + endPoint + ", TransactionBatch = " + txnBatch + " }"; } /** @@ -230,11 +228,10 @@ public class HiveWriter { } } - protected StreamingConnection newConnection(final UserGroupInformation ugi) - throws InterruptedException, ConnectFailure { + protected StreamingConnection newConnection(HiveEndPoint endPoint, boolean autoCreatePartitions, HiveConf conf, UserGroupInformation ugi) throws InterruptedException, ConnectFailure { try { return callWithTimeout(() -> { - return endPoint.newConnection(autoCreatePartitions, null, ugi); // could block + return endPoint.newConnection(autoCreatePartitions, conf, ugi); // could block }); } catch (StreamingException | TimeoutException e) { throw new ConnectFailure(endPoint, e); diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java index 8cd1a74327..ae7d3590f0 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHiveStreaming.java @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.RecordWriter; @@ -36,6 +37,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.apache.nifi.util.hive.HiveConfigurator; import org.apache.nifi.util.hive.HiveOptions; import org.apache.nifi.util.hive.HiveWriter; import org.junit.Before; @@ -58,7 +60,9 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Unit tests for PutHiveStreaming processor. @@ -69,6 +73,8 @@ public class TestPutHiveStreaming { private MockPutHiveStreaming processor; private KerberosProperties kerberosPropsWithFile; + private HiveConfigurator hiveConfigurator; + private HiveConf hiveConf; @Before public void setUp() throws Exception { @@ -81,6 +87,10 @@ public class TestPutHiveStreaming { kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf")); processor = new MockPutHiveStreaming(); + hiveConfigurator = mock(HiveConfigurator.class); + hiveConf = mock(HiveConf.class); + when(hiveConfigurator.getConfigurationFromFiles(anyString())).thenReturn(hiveConf); + processor.hiveConfigurator = hiveConfigurator; processor.setKerberosProperties(kerberosPropsWithFile); runner = TestRunners.newTestRunner(processor); } @@ -545,7 +555,7 @@ public class TestPutHiveStreaming { if (generateInterruptedExceptionOnCreateWriter) { throw new InterruptedException(); } - MockHiveWriter hiveWriter = new MockHiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(), options.getCallTimeOut(), callTimeoutPool, ugi); + MockHiveWriter hiveWriter = new MockHiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(), options.getCallTimeOut(), callTimeoutPool, ugi, hiveConfig); hiveWriter.setGenerateWriteFailure(generateWriteFailure); hiveWriter.setGenerateSerializationError(generateSerializationError); hiveWriter.setGenerateCommitFailure(generateCommitFailure); @@ -595,9 +605,9 @@ public class TestPutHiveStreaming { private HiveEndPoint endPoint; public MockHiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, - long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi) + long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveConf hiveConf) throws InterruptedException, ConnectFailure { - super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, ugi); + super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, ugi, hiveConf); this.endPoint = endPoint; } @@ -632,13 +642,15 @@ public class TestPutHiveStreaming { } @Override - protected RecordWriter getRecordWriter(HiveEndPoint endPoint) throws StreamingException { + protected RecordWriter getRecordWriter(HiveEndPoint endPoint, UserGroupInformation ugi, HiveConf conf) throws StreamingException { + assertEquals(hiveConf, conf); return mock(RecordWriter.class); } @Override - protected StreamingConnection newConnection(UserGroupInformation ugi) throws InterruptedException, ConnectFailure { + protected StreamingConnection newConnection(HiveEndPoint endPoint, boolean autoCreatePartitions, HiveConf conf, UserGroupInformation ugi) throws InterruptedException, ConnectFailure { StreamingConnection connection = mock(StreamingConnection.class); + assertEquals(hiveConf, conf); return connection; }