NIFI-3520 Updated nifi-hdfs-processors POM to depend directly on hadoop-client

- Removed NAR dependency on nifi-hadoop-libraries-nar from nifi-hadoop-nar so that hadoop-client dependencies will be included directly in nifi-hadoop-nar
- Added RequiresInstanceClassLoading annotation to AbstractHadoopProcessor and HiveConnectionPool
- UGI relogins are now performed using doAs
- Added debug-level logging for UGI relogins in KerberosTicketRenewer and AbstractHadoopProcessor

This closes #1539.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Jeff Storck 2017-02-23 20:03:40 -05:00 committed by Bryan Bende
parent 9bb31a70d9
commit a61f353051
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
6 changed files with 25 additions and 12 deletions

View File

@ -20,6 +20,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.logging.ComponentLog;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
/**
* Periodically attempts to renew the Kerberos user's ticket for the given UGI.
@ -58,11 +59,20 @@ public class KerberosTicketRenewer implements Runnable {
try {
logger.debug("Invoking renewal attempt for Kerberos ticket");
// While we run this "frequently", the Hadoop implementation will only perform the login at 80% of ticket lifetime.
ugi.checkTGTAndReloginFromKeytab();
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
ugi.checkTGTAndReloginFromKeytab();
return null;
});
} catch (IOException e) {
logger.error("Failed to renew Kerberos ticket", e);
} catch (InterruptedException e) {
logger.error("Interrupted while attempting to renew Kerberos ticket", e);
Thread.currentThread().interrupt();
return;
}
logger.debug("current UGI {}", new Object[]{ugi});
// Wait for a bit before checking again.
try {
Thread.sleep(renewalPeriod);

View File

@ -28,7 +28,7 @@
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-hadoop-libraries-nar</artifactId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>

View File

@ -46,18 +46,12 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@ -64,6 +65,7 @@ import java.util.concurrent.atomic.AtomicReference;
/**
* This is a base class that is helpful when building processors interacting with HDFS.
*/
@RequiresInstanceClassLoading
public abstract class AbstractHadoopProcessor extends AbstractProcessor {
/**
* Compression Type Enum
@ -299,6 +301,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
fs = getFileSystemAsUser(config, ugi);
}
}
getLogger().debug("resetHDFSResources UGI {}", new Object[]{ugi});
final Path workingDir = fs.getWorkingDirectory();
getLogger().info("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}",
@ -455,7 +458,10 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
getLogger().info("Kerberos ticket age exceeds threshold [{} seconds] " +
"attempting to renew ticket for user {}", new Object[]{
kerberosReloginThreshold, ugi.getUserName()});
ugi.checkTGTAndReloginFromKeytab();
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
ugi.checkTGTAndReloginFromKeytab();
return null;
});
lastKerberosReloginTime = System.currentTimeMillis() / 1000;
getLogger().info("Kerberos relogin successful or ticket still valid");
} catch (IOException e) {
@ -463,6 +469,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
// meaning dfs operations would fail
getLogger().error("Kerberos relogin failed", e);
throw new ProcessException("Unable to renew kerberos ticket", e);
} catch (InterruptedException e) {
getLogger().error("Interrupted while attempting Kerberos relogin", e);
throw new ProcessException("Unable to renew kerberos ticket", e);
}
}

View File

@ -27,7 +27,6 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -68,7 +67,6 @@ import java.util.concurrent.TimeUnit;
/**
* This processor copies FlowFiles to HDFS.
*/
@RequiresInstanceClassLoading
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"hadoop", "HDFS", "put", "copy", "filesystem", "restricted"})
@CapabilityDescription("Write FlowFile data to Hadoop Distributed File System (HDFS)")

View File

@ -22,6 +22,7 @@ import org.apache.commons.dbcp.BasicDataSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.jdbc.HiveDriver;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@ -59,6 +60,7 @@ import org.apache.nifi.controller.ControllerServiceInitializationContext;
* Implementation for Database Connection Pooling Service used for Apache Hive
* connections. Apache DBCP is used for connection pooling functionality.
*/
@RequiresInstanceClassLoading
@Tags({"hive", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Database Connection Pooling Service for Apache Hive. Connections can be asked from pool and returned after usage.")
public class HiveConnectionPool extends AbstractControllerService implements HiveDBCPService {