NIFI-7025: Wrap Hive 3 calls with UGI.doAs

Updated PutHive3Streaming to wrap calls to Hive in UGI.doAs methods
Fixed misleading logging message after the principal has been authenticated with the KDC
When connecting to unsecured Hive 3, a UGI with "simple" auth will be used

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #4108
This commit is contained in:
jstorck 2020-03-03 20:03:59 -05:00 committed by Matthew Burgess
parent bad254ec5b
commit 4b6de8d164
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
2 changed files with 189 additions and 150 deletions

View File

@ -67,8 +67,10 @@ import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils; import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.ValidationResources; import org.apache.nifi.util.hive.ValidationResources;
import javax.security.auth.login.LoginException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -321,7 +323,7 @@ public class PutHive3Streaming extends AbstractProcessor {
} }
@OnScheduled @OnScheduled
public void setup(final ProcessContext context) { public void setup(final ProcessContext context) throws IOException {
ComponentLog log = getLogger(); ComponentLog log = getLogger();
rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean(); rollbackOnFailure = context.getProperty(ROLLBACK_ON_FAILURE).asBoolean();
@ -368,9 +370,9 @@ public class PutHive3Streaming extends AbstractProcessor {
throw new ProcessException(ae); throw new ProcessException(ae);
} }
log.info("Successfully logged in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab}); log.info("Successfully logged in as principal " + resolvedPrincipal);
} else { } else {
ugi = null; ugi = SecurityUtil.loginSimple(hiveConfig);
kerberosUserReference.set(null); kerberosUserReference.set(null);
} }
@ -381,172 +383,181 @@ public class PutHive3Streaming extends AbstractProcessor {
} }
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get(); getUgi().doAs((PrivilegedAction<Void>) () -> {
if (flowFile == null) { FlowFile flowFile = session.get();
return; if (flowFile == null) {
} return null;
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
final ComponentLog log = getLogger();
String metastoreURIs = null;
if (context.getProperty(METASTORE_URI).isSet()) {
metastoreURIs = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(metastoreURIs)) {
// Shouldn't be empty at this point, log an error, penalize the flow file, and return
log.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure");
session.transfer(session.penalize(flowFile), REL_FAILURE);
} }
}
final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue(); final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean(); final String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
// Override the Hive Metastore URIs in the config if set by the user final ComponentLog log = getLogger();
if (metastoreURIs != null) { String metastoreURIs = null;
hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs); if (context.getProperty(METASTORE_URI).isSet()) {
} metastoreURIs = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isEmpty(metastoreURIs)) {
HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName) // Shouldn't be empty at this point, log an error, penalize the flow file, and return
.withHiveConf(hiveConfig) log.error("The '" + METASTORE_URI.getDisplayName() + "' property evaluated to null or empty, penalizing flow file, routing to failure");
.withCallTimeout(callTimeout) session.transfer(session.penalize(flowFile), REL_FAILURE);
.withStreamingOptimizations(!disableStreamingOptimizations);
if (!StringUtils.isEmpty(staticPartitionValuesString)) {
List<String> staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
o = o.withStaticPartitionValues(staticPartitionValues);
}
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
o = o.withKerberosPrincipal(credentialsService.getPrincipal()).withKerberosKeytab(credentialsService.getKeytab());
}
final HiveOptions options = o;
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
StreamingConnection hiveStreamingConnection = null;
try {
final RecordReader reader;
try(final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
} catch (Exception e) {
throw new RecordReaderFactoryException("Unable to create RecordReader", e);
} }
}
hiveStreamingConnection = makeStreamingConnection(options, reader); final String staticPartitionValuesString = context.getProperty(STATIC_PARTITION_VALUES).evaluateAttributeExpressions(flowFile).getValue();
final boolean disableStreamingOptimizations = context.getProperty(DISABLE_STREAMING_OPTIMIZATIONS).asBoolean();
// Write records to Hive streaming, then commit and close // Override the Hive Metastore URIs in the config if set by the user
hiveStreamingConnection.beginTransaction(); if (metastoreURIs != null) {
hiveStreamingConnection.write(in); hiveConfig.set(MetastoreConf.ConfVars.THRIFT_URIS.getHiveName(), metastoreURIs);
hiveStreamingConnection.commitTransaction(); }
in.close();
Map<String, String> updateAttributes = new HashMap<>(); HiveOptions o = new HiveOptions(metastoreURIs, dbName, tableName)
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten())); .withHiveConf(hiveConfig)
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); .withCallTimeout(callTimeout)
flowFile = session.putAllAttributes(flowFile, updateAttributes); .withStreamingOptimizations(!disableStreamingOptimizations);
session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
} catch (TransactionError te) { if (!StringUtils.isEmpty(staticPartitionValuesString)) {
if (rollbackOnFailure) { List<String> staticPartitionValues = Arrays.stream(staticPartitionValuesString.split(",")).filter(Objects::nonNull).map(String::trim).collect(Collectors.toList());
throw new ProcessException(te.getLocalizedMessage(), te); o = o.withStaticPartitionValues(staticPartitionValues);
} else { }
throw new ShouldRetryException(te.getLocalizedMessage(), te);
if (SecurityUtil.isSecurityEnabled(hiveConfig)) {
final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String resolvedPrincipal;
if (credentialsService != null) {
resolvedPrincipal = credentialsService.getPrincipal();
o = o.withKerberosKeytab(credentialsService.getKeytab());
} else resolvedPrincipal = explicitPrincipal;
o = o.withKerberosPrincipal(resolvedPrincipal);
}
final HiveOptions options = o;
// Store the original class loader, then explicitly set it to this class's classloader (for use by the Hive Metastore)
ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
StreamingConnection hiveStreamingConnection = null;
try {
final RecordReader reader;
try(final InputStream in = session.read(flowFile)) {
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
try {
reader = recordReaderFactory.createRecordReader(flowFile, in, getLogger());
} catch (Exception e) {
throw new RecordReaderFactoryException("Unable to create RecordReader", e);
}
hiveStreamingConnection = makeStreamingConnection(options, reader);
// Write records to Hive streaming, then commit and close
hiveStreamingConnection.beginTransaction();
hiveStreamingConnection.write(in);
hiveStreamingConnection.commitTransaction();
in.close();
Map<String, String> updateAttributes = new HashMap<>();
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()));
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
session.getProvenanceReporter().send(flowFile, hiveStreamingConnection.getMetastoreUri());
} catch (TransactionError te) {
if (rollbackOnFailure) {
throw new ProcessException(te.getLocalizedMessage(), te);
} else {
throw new ShouldRetryException(te.getLocalizedMessage(), te);
}
} catch (RecordReaderFactoryException rrfe) {
if (rollbackOnFailure) {
throw new ProcessException(rrfe);
} else {
log.error(
"Failed to create {} for {} - routing to failure",
new Object[]{RecordReader.class.getSimpleName(), flowFile},
rrfe
);
session.transfer(flowFile, REL_FAILURE);
return null;
}
} }
} catch (RecordReaderFactoryException rrfe) { session.transfer(flowFile, REL_SUCCESS);
} catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) {
if (rollbackOnFailure) { if (rollbackOnFailure) {
throw new ProcessException(rrfe); if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection);
}
throw new ProcessException(e.getLocalizedMessage(), e);
} else { } else {
Map<String, String> updateAttributes = new HashMap<>();
final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0";
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute);
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
log.error( log.error(
"Failed to create {} for {} - routing to failure", "Exception while processing {} - routing to failure",
new Object[]{RecordReader.class.getSimpleName(), flowFile}, new Object[]{flowFile},
rrfe e
); );
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
return;
} }
} } catch (DiscontinuedException e) {
session.transfer(flowFile, REL_SUCCESS); // The input FlowFile processing is discontinued. Keep it in the input queue.
} catch (InvalidTable | SerializationError | StreamingIOFailure | IOException e) { getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
if (rollbackOnFailure) { session.transfer(flowFile, Relationship.SELF);
} catch (ConnectionError ce) {
// If we can't connect to the metastore, yield the processor
context.yield();
throw new ProcessException("A connection to metastore cannot be established", ce);
} catch (ShouldRetryException e) {
// This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
getLogger().error(e.getLocalizedMessage(), e);
if (hiveStreamingConnection != null) { if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection); abortConnection(hiveStreamingConnection);
} }
throw new ProcessException(e.getLocalizedMessage(), e);
} else {
Map<String, String> updateAttributes = new HashMap<>();
final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0";
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute);
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
log.error(
"Exception while processing {} - routing to failure",
new Object[]{flowFile},
e
);
session.transfer(flowFile, REL_FAILURE);
}
} catch (DiscontinuedException e) {
// The input FlowFile processing is discontinued. Keep it in the input queue.
getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, e);
session.transfer(flowFile, Relationship.SELF);
} catch (ConnectionError ce) {
// If we can't connect to the metastore, yield the processor
context.yield();
throw new ProcessException("A connection to metastore cannot be established", ce);
} catch (ShouldRetryException e) {
// This exception is already a result of adjusting an error, so simply transfer the FlowFile to retry. Still need to abort the txn
getLogger().error(e.getLocalizedMessage(), e);
if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection);
}
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_RETRY);
} catch (StreamingException se) {
// Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
Throwable cause = se.getCause();
if (cause == null) cause = se;
// This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
if (rollbackOnFailure) {
if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection);
}
throw new ProcessException(cause.getLocalizedMessage(), cause);
} else {
flowFile = session.penalize(flowFile); flowFile = session.penalize(flowFile);
Map<String, String> updateAttributes = new HashMap<>(); session.transfer(flowFile, REL_RETRY);
final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0"; } catch (StreamingException se) {
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute); // Handle all other exceptions. These are often record-based exceptions (since Hive will throw a subclass of the exception caught above)
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName()); Throwable cause = se.getCause();
flowFile = session.putAllAttributes(flowFile, updateAttributes); if (cause == null) cause = se;
log.error( // This is a failure on the incoming data, rollback on failure if specified; otherwise route to failure after penalizing (and abort txn in any case)
"Exception while trying to stream {} to hive - routing to failure", if (rollbackOnFailure) {
new Object[]{flowFile}, if (hiveStreamingConnection != null) {
se abortConnection(hiveStreamingConnection);
); }
session.transfer(flowFile, REL_FAILURE); throw new ProcessException(cause.getLocalizedMessage(), cause);
} } else {
flowFile = session.penalize(flowFile);
Map<String, String> updateAttributes = new HashMap<>();
final String recordCountAttribute = (hiveStreamingConnection != null) ? Long.toString(hiveStreamingConnection.getConnectionStats().getRecordsWritten()) : "0";
updateAttributes.put(HIVE_STREAMING_RECORD_COUNT_ATTR, recordCountAttribute);
updateAttributes.put(ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
flowFile = session.putAllAttributes(flowFile, updateAttributes);
log.error(
"Exception while trying to stream {} to hive - routing to failure",
new Object[]{flowFile},
se
);
session.transfer(flowFile, REL_FAILURE);
}
} catch (Throwable t) { } catch (Throwable t) {
if (hiveStreamingConnection != null) { if (hiveStreamingConnection != null) {
abortConnection(hiveStreamingConnection); abortConnection(hiveStreamingConnection);
}
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t);
} finally {
closeConnection(hiveStreamingConnection);
// Restore original class loader, might not be necessary but is good practice since the processor task changed it
Thread.currentThread().setContextClassLoader(originalClassloader);
} }
throw (t instanceof ProcessException) ? (ProcessException) t : new ProcessException(t); return null;
} finally { });
closeConnection(hiveStreamingConnection);
// Restore original class loader, might not be necessary but is good practice since the processor task changed it
Thread.currentThread().setContextClassLoader(originalClassloader);
}
} }
StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException { StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
@ -623,5 +634,23 @@ public class PutHive3Streaming extends AbstractProcessor {
super(message, cause); super(message, cause);
} }
} }
UserGroupInformation getUgi() {
getLogger().trace("getting UGI instance");
if (kerberosUserReference.get() != null) {
// if there's a KerberosUser associated with this UGI, check the TGT and relogin if it is close to expiring
KerberosUser kerberosUser = kerberosUserReference.get();
getLogger().debug("kerberosUser is " + kerberosUser);
try {
getLogger().debug("checking TGT on kerberosUser [{}]", new Object[] {kerberosUser});
kerberosUser.checkTGTAndRelogin();
} catch (LoginException e) {
throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), e);
}
} else {
getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
}
return ugi;
}
} }

View File

@ -155,7 +155,7 @@ public class TestPutHive3Streaming {
System.setProperty("java.security.krb5.kdc", "nifi.kdc"); System.setProperty("java.security.krb5.kdc", "nifi.kdc");
ugi = null; ugi = null;
processor = new MockPutHive3Streaming(); processor = new MockPutHive3Streaming(ugi);
hiveConfigurator = mock(HiveConfigurator.class); hiveConfigurator = mock(HiveConfigurator.class);
hiveConf = new HiveConf(); hiveConf = new HiveConf();
when(hiveConfigurator.getConfigurationFromFiles(anyString())).thenReturn(hiveConf); when(hiveConfigurator.getConfigurationFromFiles(anyString())).thenReturn(hiveConf);
@ -272,6 +272,7 @@ public class TestPutHive3Streaming {
runner.setProperty(PutHive3Streaming.DB_NAME, "default"); runner.setProperty(PutHive3Streaming.DB_NAME, "default");
runner.setProperty(PutHive3Streaming.TABLE_NAME, "users"); runner.setProperty(PutHive3Streaming.TABLE_NAME, "users");
processor.ugi = mock(UserGroupInformation.class); processor.ugi = mock(UserGroupInformation.class);
processor.kerberosUserReference.set(mock(KerberosUser.class));
runner.run(); runner.run();
assertNull(processor.ugi); assertNull(processor.ugi);
assertNull(processor.kerberosUserReference.get()); assertNull(processor.kerberosUserReference.get());
@ -1128,6 +1129,10 @@ public class TestPutHive3Streaming {
new FieldSchema("scale", serdeConstants.DOUBLE_TYPE_NAME, "") new FieldSchema("scale", serdeConstants.DOUBLE_TYPE_NAME, "")
); );
private MockPutHive3Streaming(UserGroupInformation ugi) {
this.ugi = ugi;
}
@Override @Override
StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException { StreamingConnection makeStreamingConnection(HiveOptions options, RecordReader reader) throws StreamingException {
@ -1175,6 +1180,11 @@ public class TestPutHive3Streaming {
public void setGeneratePermissionsFailure(boolean generatePermissionsFailure) { public void setGeneratePermissionsFailure(boolean generatePermissionsFailure) {
this.generatePermissionsFailure = generatePermissionsFailure; this.generatePermissionsFailure = generatePermissionsFailure;
} }
@Override
UserGroupInformation getUgi() {
return ugi;
}
} }
private class MockHiveStreamingConnection implements StreamingConnection { private class MockHiveStreamingConnection implements StreamingConnection {