mirror of https://github.com/apache/nifi.git
NIFI-2574 This clsoes #887. Fixing NPE when using kerberos keytab location from contexts, and cleaning up hadoop/hbase/hive kerberos variables
This commit is contained in:
parent
0e90a0f76a
commit
e0e4b3407a
|
@ -127,7 +127,7 @@ public class StandardReportingInitializationContext implements ReportingInitiali
|
|||
|
||||
@Override
|
||||
public File getKerberosServiceKeytab() {
|
||||
return new File(nifiProperties.getKerberosServiceKeytabLocation());
|
||||
return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -102,7 +102,7 @@ public class StandardControllerServiceInitializationContext implements Controlle
|
|||
|
||||
@Override
|
||||
public File getKerberosServiceKeytab() {
|
||||
return new File(nifiProperties.getKerberosServiceKeytabLocation());
|
||||
return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -69,7 +69,7 @@ public class StandardProcessorInitializationContext implements ProcessorInitiali
|
|||
|
||||
@Override
|
||||
public File getKerberosServiceKeytab() {
|
||||
return new File(nifiProperties.getKerberosServiceKeytabLocation());
|
||||
return nifiProperties.getKerberosServiceKeytabLocation() == null ? null : new File(nifiProperties.getKerberosServiceKeytabLocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -127,9 +127,7 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
private long lastKerberosReloginTime;
|
||||
protected KerberosProperties kerberosProperties;
|
||||
protected List<PropertyDescriptor> properties;
|
||||
private volatile String kerberosServicePrincipal = null;
|
||||
private volatile File kerberosConfigFile = null;
|
||||
private volatile File kerberosServiceKeytab = null;
|
||||
|
||||
// variables shared by all threads of this processor
|
||||
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
|
||||
|
@ -141,7 +139,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
@Override
|
||||
protected void init(ProcessorInitializationContext context) {
|
||||
hdfsResources.set(new HdfsResources(null, null, null));
|
||||
kerberosProperties = getKerberosProperties();
|
||||
|
||||
kerberosConfigFile = context.getKerberosConfigurationFile();
|
||||
kerberosProperties = getKerberosProperties(kerberosConfigFile);
|
||||
|
||||
List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(HADOOP_CONFIGURATION_RESOURCES);
|
||||
|
@ -149,12 +149,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
props.add(kerberosProperties.getKerberosKeytab());
|
||||
props.add(KERBEROS_RELOGIN_PERIOD);
|
||||
properties = Collections.unmodifiableList(props);
|
||||
kerberosServicePrincipal = context.getKerberosServicePrincipal();
|
||||
kerberosConfigFile = context.getKerberosConfigurationFile();
|
||||
kerberosServiceKeytab = context.getKerberosServiceKeytab();
|
||||
}
|
||||
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return new KerberosProperties(kerberosConfigFile);
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
|
@ -113,7 +114,7 @@ public class GetHDFSSequenceFileTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return kerberosProperties;
|
||||
}
|
||||
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Collection;
|
||||
|
@ -246,7 +247,7 @@ public class GetHDFSTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return testKerberosProperties;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -273,7 +273,7 @@ public class PutHDFSTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return testKerberosProperties;
|
||||
}
|
||||
});
|
||||
|
@ -375,7 +375,7 @@ public class PutHDFSTest {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return testKerberosProperties;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,8 @@ import org.apache.nifi.processor.ProcessContext;
|
|||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
public class SimpleHadoopProcessor extends AbstractHadoopProcessor {
|
||||
|
||||
private KerberosProperties testKerberosProperties;
|
||||
|
@ -34,7 +36,7 @@ public class SimpleHadoopProcessor extends AbstractHadoopProcessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return testKerberosProperties;
|
||||
}
|
||||
|
||||
|
|
|
@ -213,7 +213,7 @@ public class TestCreateHadoopSequenceFile {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return testKerbersProperties;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.nifi.util.TestRunners;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
@ -111,7 +112,7 @@ public class TestFetchHDFS {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return testKerberosProps;
|
||||
}
|
||||
|
||||
|
|
|
@ -308,7 +308,7 @@ public class TestListHDFS {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return testKerberosProps;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -252,7 +253,7 @@ public class TestGetHDFSEvents {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return testKerberosProperties;
|
||||
}
|
||||
|
||||
|
|
|
@ -133,9 +133,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
|||
|
||||
private volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
|
||||
private volatile UserGroupInformation ugi;
|
||||
private volatile String kerberosServicePrincipal = null;
|
||||
private volatile File kerberosConfigFile = null;
|
||||
private volatile File kerberosServiceKeytab = null;
|
||||
private volatile KerberosProperties kerberosProperties;
|
||||
|
||||
static {
|
||||
|
@ -151,9 +149,7 @@ public class HiveConnectionPool extends AbstractControllerService implements Hiv
|
|||
|
||||
@Override
|
||||
protected void init(final ControllerServiceInitializationContext context) {
|
||||
kerberosServicePrincipal = context.getKerberosServicePrincipal();
|
||||
kerberosConfigFile = context.getKerberosConfigurationFile();
|
||||
kerberosServiceKeytab = context.getKerberosServiceKeytab();
|
||||
kerberosProperties = new KerberosProperties(kerberosConfigFile);
|
||||
properties.add(kerberosProperties.getKerberosPrincipal());
|
||||
properties.add(kerberosProperties.getKerberosKeytab());
|
||||
|
|
|
@ -244,9 +244,7 @@ public class PutHiveStreaming extends AbstractProcessor {
|
|||
private static final long TICKET_RENEWAL_PERIOD = 60000;
|
||||
|
||||
protected KerberosProperties kerberosProperties;
|
||||
private volatile String kerberosServicePrincipal = null;
|
||||
private volatile File kerberosConfigFile = null;
|
||||
private volatile File kerberosServiceKeytab = null;
|
||||
|
||||
protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
|
||||
protected volatile UserGroupInformation ugi;
|
||||
|
@ -285,9 +283,7 @@ public class PutHiveStreaming extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
protected void init(ProcessorInitializationContext context) {
|
||||
kerberosServicePrincipal = context.getKerberosServicePrincipal();
|
||||
kerberosConfigFile = context.getKerberosConfigurationFile();
|
||||
kerberosServiceKeytab = context.getKerberosServiceKeytab();
|
||||
kerberosProperties = new KerberosProperties(kerberosConfigFile);
|
||||
propertyDescriptors.add(kerberosProperties.getKerberosPrincipal());
|
||||
propertyDescriptors.add(kerberosProperties.getKerberosKeytab());
|
||||
|
|
|
@ -91,16 +91,15 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
|||
|
||||
private List<PropertyDescriptor> properties;
|
||||
private KerberosProperties kerberosProperties;
|
||||
private volatile String kerberosServicePrincipal = null;
|
||||
private volatile File kerberosConfigFile = null;
|
||||
private volatile File kerberosServiceKeytab = null;
|
||||
|
||||
// Holder of cached Configuration information so validation does not reload the same config over and over
|
||||
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
|
||||
this.kerberosProperties = getKerberosProperties();
|
||||
kerberosConfigFile = config.getKerberosConfigurationFile();
|
||||
kerberosProperties = getKerberosProperties(kerberosConfigFile);
|
||||
|
||||
List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(HADOOP_CONF_FILES);
|
||||
|
@ -111,12 +110,9 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
|||
props.add(ZOOKEEPER_ZNODE_PARENT);
|
||||
props.add(HBASE_CLIENT_RETRIES);
|
||||
this.properties = Collections.unmodifiableList(props);
|
||||
kerberosServicePrincipal = config.getKerberosServicePrincipal();
|
||||
kerberosConfigFile = config.getKerberosConfigurationFile();
|
||||
kerberosServiceKeytab = config.getKerberosServiceKeytab();
|
||||
}
|
||||
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return new KerberosProperties(kerberosConfigFile);
|
||||
}
|
||||
|
||||
|
|
|
@ -451,7 +451,7 @@ public class TestHBase_1_1_2_ClientService {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected KerberosProperties getKerberosProperties() {
|
||||
protected KerberosProperties getKerberosProperties(File kerberosConfigFile) {
|
||||
return kerberosProperties;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue