mirror of https://github.com/apache/nifi.git
NIFI-1698 Improving customValidate in AbstractHadoopProcessor and HBaseClient service to not reload Configuration unless it changed. This closes #313
This commit is contained in:
parent
e02c79975e
commit
04c6830937
|
@ -79,7 +79,7 @@ public class SecurityUtil {
|
|||
* @return true if kerberos is enabled on the given configuration, false otherwise
|
||||
*
|
||||
*/
|
||||
public static synchronized boolean isSecurityEnabled(final Configuration config) {
|
||||
public static boolean isSecurityEnabled(final Configuration config) {
|
||||
Validate.notNull(config);
|
||||
return "kerberos".equalsIgnoreCase(config.get("hadoop.security.authentication"));
|
||||
}
|
||||
|
|
|
@ -116,6 +116,9 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
// Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
|
||||
private final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
|
||||
|
||||
// Holder of cached Configuration information so validation does not reload the same config over and over
|
||||
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
protected void init(ProcessorInitializationContext context) {
|
||||
hdfsResources.set(new HdfsResources(null, null, null));
|
||||
|
@ -147,12 +150,21 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
final List<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
if (!StringUtils.isBlank(configResources)) {
|
||||
Configuration conf = null;
|
||||
try {
|
||||
conf = getConfigurationFromResources(configResources);
|
||||
ValidationResources resources = validationResourceHolder.get();
|
||||
|
||||
// if no resources in the holder, or if the holder has different resources loaded,
|
||||
// then load the Configuration and set the new resources in the holder
|
||||
if (resources == null || !configResources.equals(resources.getConfigResources())) {
|
||||
getLogger().debug("Reloading validation resources");
|
||||
resources = new ValidationResources(configResources, getConfigurationFromResources(configResources));
|
||||
validationResourceHolder.set(resources);
|
||||
}
|
||||
|
||||
final Configuration conf = resources.getConfiguration();
|
||||
results.addAll(KerberosProperties.validatePrincipalAndKeytab(
|
||||
this.getClass().getSimpleName(), conf, principal, keytab, getLogger()));
|
||||
|
||||
} catch (IOException e) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.valid(false)
|
||||
|
@ -452,4 +464,23 @@ public abstract class AbstractHadoopProcessor extends AbstractProcessor {
|
|||
return userGroupInformation;
|
||||
}
|
||||
}
|
||||
|
||||
static protected class ValidationResources {
|
||||
private final String configResources;
|
||||
private final Configuration configuration;
|
||||
|
||||
public ValidationResources(String configResources, Configuration configuration) {
|
||||
this.configResources = configResources;
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
public String getConfigResources() {
|
||||
return configResources;
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Tags({ "hbase", "client"})
|
||||
@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " +
|
||||
|
@ -90,6 +91,9 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
|||
private List<PropertyDescriptor> properties;
|
||||
private KerberosProperties kerberosProperties;
|
||||
|
||||
// Holder of cached Configuration information so validation does not reload the same config over and over
|
||||
private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
|
||||
this.kerberosProperties = getKerberosProperties();
|
||||
|
@ -145,7 +149,17 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
|||
|
||||
if (confFileProvided) {
|
||||
final String configFiles = validationContext.getProperty(HADOOP_CONF_FILES).getValue();
|
||||
final Configuration hbaseConfig = getConfigurationFromFiles(configFiles);
|
||||
ValidationResources resources = validationResourceHolder.get();
|
||||
|
||||
// if no resources in the holder, or if the holder has different resources loaded,
|
||||
// then load the Configuration and set the new resources in the holder
|
||||
if (resources == null || !configFiles.equals(resources.getConfigResources())) {
|
||||
getLogger().debug("Reloading validation resources");
|
||||
resources = new ValidationResources(configFiles, getConfigurationFromFiles(configFiles));
|
||||
validationResourceHolder.set(resources);
|
||||
}
|
||||
|
||||
final Configuration hbaseConfig = resources.getConfiguration();
|
||||
final String principal = validationContext.getProperty(kerberosProperties.getKerberosPrincipal()).getValue();
|
||||
final String keytab = validationContext.getProperty(kerberosProperties.getKerberosKeytab()).getValue();
|
||||
|
||||
|
@ -372,4 +386,23 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
|||
|
||||
return table.getScanner(scan);
|
||||
}
|
||||
|
||||
static protected class ValidationResources {
|
||||
private final String configResources;
|
||||
private final Configuration configuration;
|
||||
|
||||
public ValidationResources(String configResources, Configuration configuration) {
|
||||
this.configResources = configResources;
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
public String getConfigResources() {
|
||||
return configResources;
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
return configuration;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue