NIFI-3530 - Ensuring configuration and ugi information is passed down to Hive in PutHiveStreaming

NIFI-3530 - closing renewer in PutHiveStreaming.cleanup(), making npe less likely in HiveWriter.toString()

This closes 
This commit is contained in:
Bryan Rosander 2017-02-24 16:09:42 -05:00 committed by Matt Burgess
parent 1978c986b5
commit cfe899d189
5 changed files with 41 additions and 29 deletions
nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src
main/java/org/apache/nifi
test/java/org/apache/nifi/processors/hive

View File

@ -26,7 +26,7 @@ import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
@ -258,6 +258,7 @@ public class PutHiveStreaming extends AbstractProcessor {
protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
protected volatile UserGroupInformation ugi;
protected volatile HiveConf hiveConfig;
protected final AtomicBoolean isInitialized = new AtomicBoolean(false);
@ -318,7 +319,7 @@ public class PutHiveStreaming extends AbstractProcessor {
final Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).asInteger();
final Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).asInteger();
final String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).getValue();
final Configuration hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
hiveConfig = hiveConfigurator.getConfigurationFromFiles(configFiles);
// add any dynamic properties to the Hive configuration
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
@ -669,6 +670,7 @@ public class PutHiveStreaming extends AbstractProcessor {
}
callTimeoutPool = null;
hiveConfigurator.stopRenewer();
}
private void setupHeartBeatTimer() {
@ -837,7 +839,7 @@ public class PutHiveStreaming extends AbstractProcessor {
protected HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
throws HiveWriter.ConnectFailure, InterruptedException {
return HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options);
return HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options, hiveConfig);
}
protected KerberosProperties getKerberosProperties() {

View File

@ -62,8 +62,8 @@ public class HiveConfigurator {
return problems;
}
public Configuration getConfigurationFromFiles(final String configFiles) {
final Configuration hiveConfig = new HiveConf();
public HiveConf getConfigurationFromFiles(final String configFiles) {
final HiveConf hiveConfig = new HiveConf();
if (StringUtils.isNotBlank(configFiles)) {
for (final String configFile : configFiles.split(",")) {
hiveConfig.addResource(new Path(configFile.trim()));

View File

@ -18,6 +18,7 @@
package org.apache.nifi.util.hive;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
@ -39,10 +40,10 @@ public class HiveUtils {
return new HiveEndPoint(options.getMetaStoreURI(), options.getDatabaseName(), options.getTableName(), partitionVals);
}
public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options)
public static HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options, HiveConf hiveConf)
throws HiveWriter.ConnectFailure, InterruptedException {
return new HiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(),
options.getCallTimeOut(), callTimeoutPool, ugi);
options.getCallTimeOut(), callTimeoutPool, ugi, hiveConf);
}
public static void logAllHiveEndPoints(Map<HiveEndPoint, HiveWriter> allWriters) {

View File

@ -19,6 +19,7 @@
package org.apache.nifi.util.hive;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@ -26,6 +27,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
@ -54,23 +56,17 @@ public class HiveWriter {
private TransactionBatch txnBatch;
private long lastUsed; // time of last flush on this writer
protected boolean closed; // flag indicating HiveWriter was closed
private boolean autoCreatePartitions;
private UserGroupInformation ugi;
private int totalRecords = 0;
public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch,
boolean autoCreatePartitions, long callTimeout,
ExecutorService callTimeoutPool, UserGroupInformation ugi)
public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveConf hiveConf)
throws InterruptedException, ConnectFailure {
try {
this.autoCreatePartitions = autoCreatePartitions;
this.callTimeout = callTimeout;
this.callTimeoutPool = callTimeoutPool;
this.endPoint = endPoint;
this.ugi = ugi;
this.connection = newConnection(ugi);
this.connection = newConnection(endPoint, autoCreatePartitions, hiveConf, ugi);
this.txnsPerBatch = txnsPerBatch;
this.recordWriter = getRecordWriter(endPoint);
this.recordWriter = getRecordWriter(endPoint, ugi, hiveConf);
this.txnBatch = nextTxnBatch(recordWriter);
this.closed = false;
this.lastUsed = System.currentTimeMillis();
@ -81,15 +77,17 @@ public class HiveWriter {
}
}
protected RecordWriter getRecordWriter(HiveEndPoint endPoint) throws StreamingException {
return new StrictJsonWriter(endPoint);
protected RecordWriter getRecordWriter(HiveEndPoint endPoint, UserGroupInformation ugi, HiveConf hiveConf) throws StreamingException, IOException, InterruptedException {
if (ugi == null) {
return new StrictJsonWriter(endPoint, hiveConf);
} else {
return ugi.doAs((PrivilegedExceptionAction<StrictJsonWriter>) () -> new StrictJsonWriter(endPoint, hiveConf));
}
}
@Override
public String toString() {
return "{ "
+ "endPoint = " + endPoint.toString()
+ ", TransactionBatch = " + txnBatch.toString() + " }";
return "{ endPoint = " + endPoint + ", TransactionBatch = " + txnBatch + " }";
}
/**
@ -230,11 +228,10 @@ public class HiveWriter {
}
}
protected StreamingConnection newConnection(final UserGroupInformation ugi)
throws InterruptedException, ConnectFailure {
protected StreamingConnection newConnection(HiveEndPoint endPoint, boolean autoCreatePartitions, HiveConf conf, UserGroupInformation ugi) throws InterruptedException, ConnectFailure {
try {
return callWithTimeout(() -> {
return endPoint.newConnection(autoCreatePartitions, null, ugi); // could block
return endPoint.newConnection(autoCreatePartitions, conf, ugi); // could block
});
} catch (StreamingException | TimeoutException e) {
throw new ConnectFailure(endPoint, e);

View File

@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.RecordWriter;
@ -36,6 +37,7 @@ import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveWriter;
import org.junit.Before;
@ -58,7 +60,9 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Unit tests for PutHiveStreaming processor.
@ -69,6 +73,8 @@ public class TestPutHiveStreaming {
private MockPutHiveStreaming processor;
private KerberosProperties kerberosPropsWithFile;
private HiveConfigurator hiveConfigurator;
private HiveConf hiveConf;
@Before
public void setUp() throws Exception {
@ -81,6 +87,10 @@ public class TestPutHiveStreaming {
kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf"));
processor = new MockPutHiveStreaming();
hiveConfigurator = mock(HiveConfigurator.class);
hiveConf = mock(HiveConf.class);
when(hiveConfigurator.getConfigurationFromFiles(anyString())).thenReturn(hiveConf);
processor.hiveConfigurator = hiveConfigurator;
processor.setKerberosProperties(kerberosPropsWithFile);
runner = TestRunners.newTestRunner(processor);
}
@ -545,7 +555,7 @@ public class TestPutHiveStreaming {
if (generateInterruptedExceptionOnCreateWriter) {
throw new InterruptedException();
}
MockHiveWriter hiveWriter = new MockHiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(), options.getCallTimeOut(), callTimeoutPool, ugi);
MockHiveWriter hiveWriter = new MockHiveWriter(endPoint, options.getTxnsPerBatch(), options.getAutoCreatePartitions(), options.getCallTimeOut(), callTimeoutPool, ugi, hiveConfig);
hiveWriter.setGenerateWriteFailure(generateWriteFailure);
hiveWriter.setGenerateSerializationError(generateSerializationError);
hiveWriter.setGenerateCommitFailure(generateCommitFailure);
@ -595,9 +605,9 @@ public class TestPutHiveStreaming {
private HiveEndPoint endPoint;
public MockHiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions,
long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi)
long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveConf hiveConf)
throws InterruptedException, ConnectFailure {
super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, ugi);
super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, ugi, hiveConf);
this.endPoint = endPoint;
}
@ -632,13 +642,15 @@ public class TestPutHiveStreaming {
}
@Override
protected RecordWriter getRecordWriter(HiveEndPoint endPoint) throws StreamingException {
protected RecordWriter getRecordWriter(HiveEndPoint endPoint, UserGroupInformation ugi, HiveConf conf) throws StreamingException {
assertEquals(hiveConf, conf);
return mock(RecordWriter.class);
}
@Override
protected StreamingConnection newConnection(UserGroupInformation ugi) throws InterruptedException, ConnectFailure {
protected StreamingConnection newConnection(HiveEndPoint endPoint, boolean autoCreatePartitions, HiveConf conf, UserGroupInformation ugi) throws InterruptedException, ConnectFailure {
StreamingConnection connection = mock(StreamingConnection.class);
assertEquals(hiveConf, conf);
return connection;
}