NIFI-3574 - PutHiveStreaming UGI fixes

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1578
This commit is contained in:
Bryan Rosander 2017-03-08 17:41:59 -05:00 committed by Matt Burgess
parent 443803e729
commit cd8eb775e6
5 changed files with 58 additions and 6 deletions

View File

@ -28,6 +28,8 @@ import java.io.IOException;
* interfering with each other. * interfering with each other.
*/ */
public class SecurityUtil { public class SecurityUtil {
public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
public static final String KERBEROS = "kerberos";
/** /**
* Initializes UserGroupInformation with the given Configuration and performs the login for the given principal * Initializes UserGroupInformation with the given Configuration and performs the login for the given principal
@ -81,7 +83,7 @@ public class SecurityUtil {
*/ */
public static boolean isSecurityEnabled(final Configuration config) { public static boolean isSecurityEnabled(final Configuration config) {
Validate.notNull(config); Validate.notNull(config);
return "kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication")); return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
} }
/** /**

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.hive; package org.apache.nifi.processors.hive;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory; import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileConstants;
@ -32,6 +31,7 @@ import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint; import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError; import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -61,6 +61,7 @@ import org.apache.nifi.util.hive.HiveWriter;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
@ -94,6 +95,7 @@ import java.util.regex.Pattern;
@WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' " @WritesAttribute(attribute = "hivestreaming.record.count", description = "This attribute is written on the flow files routed to the 'success' "
+ "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.") + "and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively.")
}) })
@RequiresInstanceClassLoading
public class PutHiveStreaming extends AbstractProcessor { public class PutHiveStreaming extends AbstractProcessor {
// Attributes // Attributes
@ -349,6 +351,8 @@ public class PutHiveStreaming extends AbstractProcessor {
} }
log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab}); log.info("Successfully logged in as principal {} with keytab {}", new Object[]{principal, keyTab});
options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab); options = options.withKerberosPrincipal(principal).withKerberosKeytab(keyTab);
} else {
ugi = null;
} }
allWriters = new ConcurrentHashMap<>(); allWriters = new ConcurrentHashMap<>();
@ -662,14 +666,14 @@ public class PutHiveStreaming extends AbstractProcessor {
callTimeoutPool.shutdown(); callTimeoutPool.shutdown();
try { try {
while (!callTimeoutPool.isTerminated()) { while (!callTimeoutPool.isTerminated()) {
callTimeoutPool.awaitTermination( callTimeoutPool.awaitTermination(options.getCallTimeOut(), TimeUnit.MILLISECONDS);
options.getCallTimeOut(), TimeUnit.MILLISECONDS);
} }
} catch (Throwable t) { } catch (Throwable t) {
log.warn("shutdown interrupted on " + callTimeoutPool, t); log.warn("shutdown interrupted on " + callTimeoutPool, t);
} }
callTimeoutPool = null; callTimeoutPool = null;
ugi = null;
hiveConfigurator.stopRenewer(); hiveConfigurator.stopRenewer();
} }

View File

@ -74,7 +74,8 @@ public class HiveConfigurator {
public void preload(Configuration configuration) { public void preload(Configuration configuration) {
try { try {
FileSystem.get(configuration); FileSystem.get(configuration).close();
UserGroupInformation.setConfiguration(configuration);
} catch (IOException ioe) { } catch (IOException ioe) {
// Suppress exception as future uses of this configuration will fail // Suppress exception as future uses of this configuration will fail
} }

View File

@ -53,6 +53,7 @@ public class HiveWriter {
private final ExecutorService callTimeoutPool; private final ExecutorService callTimeoutPool;
private final long callTimeout; private final long callTimeout;
private final Object txnBatchLock = new Object(); private final Object txnBatchLock = new Object();
private final UserGroupInformation ugi;
private TransactionBatch txnBatch; private TransactionBatch txnBatch;
private long lastUsed; // time of last flush on this writer private long lastUsed; // time of last flush on this writer
protected boolean closed; // flag indicating HiveWriter was closed protected boolean closed; // flag indicating HiveWriter was closed
@ -61,6 +62,7 @@ public class HiveWriter {
public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveConf hiveConf) public HiveWriter(HiveEndPoint endPoint, int txnsPerBatch, boolean autoCreatePartitions, long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveConf hiveConf)
throws InterruptedException, ConnectFailure { throws InterruptedException, ConnectFailure {
try { try {
this.ugi = ugi;
this.callTimeout = callTimeout; this.callTimeout = callTimeout;
this.callTimeoutPool = callTimeoutPool; this.callTimeoutPool = callTimeoutPool;
this.endPoint = endPoint; this.endPoint = endPoint;
@ -348,7 +350,12 @@ public class HiveWriter {
*/ */
private <T> T callWithTimeout(final CallRunner<T> callRunner) private <T> T callWithTimeout(final CallRunner<T> callRunner)
throws TimeoutException, StreamingException, InterruptedException { throws TimeoutException, StreamingException, InterruptedException {
Future<T> future = callTimeoutPool.submit(callRunner::call); Future<T> future = callTimeoutPool.submit(() -> {
if (ugi == null) {
return callRunner.call();
}
return ugi.doAs((PrivilegedExceptionAction<T>) () -> callRunner.call());
});
try { try {
if (callTimeout > 0) { if (callTimeout > 0) {
return future.get(callTimeout, TimeUnit.MILLISECONDS); return future.get(callTimeout, TimeUnit.MILLISECONDS);

View File

@ -33,10 +33,12 @@ import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StreamingException; import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.TransactionBatch; import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.apache.nifi.hadoop.KerberosProperties; import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator; import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions; import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveWriter; import org.apache.nifi.util.hive.HiveWriter;
@ -60,7 +62,10 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -75,6 +80,7 @@ public class TestPutHiveStreaming {
private KerberosProperties kerberosPropsWithFile; private KerberosProperties kerberosPropsWithFile;
private HiveConfigurator hiveConfigurator; private HiveConfigurator hiveConfigurator;
private HiveConf hiveConf; private HiveConf hiveConf;
private UserGroupInformation ugi;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -84,6 +90,7 @@ public class TestPutHiveStreaming {
System.setProperty("java.security.krb5.realm", "nifi.com"); System.setProperty("java.security.krb5.realm", "nifi.com");
System.setProperty("java.security.krb5.kdc", "nifi.kdc"); System.setProperty("java.security.krb5.kdc", "nifi.kdc");
ugi = null;
kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf")); kerberosPropsWithFile = new KerberosProperties(new File("src/test/resources/krb5.conf"));
processor = new MockPutHiveStreaming(); processor = new MockPutHiveStreaming();
@ -107,6 +114,36 @@ public class TestPutHiveStreaming {
runner.run(); runner.run();
} }
@Test
public void testUgiGetsCleared() {
runner.setValidateExpressionUsage(false);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
processor.ugi = mock(UserGroupInformation.class);
runner.run();
assertNull(processor.ugi);
}
@Test
public void testUgiGetsSetIfSecure() throws AuthenticationFailedException, IOException {
when(hiveConf.get(SecurityUtil.HADOOP_SECURITY_AUTHENTICATION)).thenReturn(SecurityUtil.KERBEROS);
ugi = mock(UserGroupInformation.class);
when(hiveConfigurator.authenticate(eq(hiveConf), anyString(), anyString(), anyLong(), any())).thenReturn(ugi);
runner.setValidateExpressionUsage(false);
runner.setProperty(PutHiveStreaming.METASTORE_URI, "thrift://localhost:9083");
runner.setProperty(PutHiveStreaming.DB_NAME, "default");
runner.setProperty(PutHiveStreaming.TABLE_NAME, "users");
Map<String, Object> user1 = new HashMap<String, Object>() {
{
put("name", "Joe");
put("favorite_number", 146);
}
};
runner.enqueue(createAvroRecord(Collections.singletonList(user1)));
runner.run();
}
@Test @Test
public void testSetupBadPartitionColumns() throws Exception { public void testSetupBadPartitionColumns() throws Exception {
runner.setValidateExpressionUsage(false); runner.setValidateExpressionUsage(false);
@ -608,6 +645,7 @@ public class TestPutHiveStreaming {
long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveConf hiveConf) long callTimeout, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveConf hiveConf)
throws InterruptedException, ConnectFailure { throws InterruptedException, ConnectFailure {
super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, ugi, hiveConf); super(endPoint, txnsPerBatch, autoCreatePartitions, callTimeout, callTimeoutPool, ugi, hiveConf);
assertEquals(TestPutHiveStreaming.this.ugi, ugi);
this.endPoint = endPoint; this.endPoint = endPoint;
} }