NIFI-7523: Use SSL Context Service for Atlas HTTPS connection in Atla… (#4348)

* NIFI-7523: Use SSL Context Service for Atlas HTTPS connection in Atlas reporting task

Also fixing ControllerServiceDisabledException-s when validating the Kerberos config

* NIFI-7523: Fixed test failure on Windows

* NIFI-7523: Added license headers.

* NIFI-7523: Fixed another test failure on Windows

* NIFI-7523: Review changes
This commit is contained in:
Peter Turcsanyi 2020-06-23 20:22:50 +02:00 committed by GitHub
parent 005d05f20b
commit 327d73f5cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 131 additions and 73 deletions

View File

@ -20,7 +20,11 @@ import com.sun.jersey.api.client.ClientResponse;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.security.SecurityProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@ -74,7 +78,10 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
@ -82,6 +89,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@ -182,6 +190,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
// Atlas generates ssl-client.xml in this directory and then loads it from classpath
.dynamicallyModifiesClasspath(true)
.build();
public static final PropertyDescriptor ATLAS_NIFI_URL = new PropertyDescriptor.Builder()
@ -218,10 +228,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
.defaultValue("false")
.build();
static final PropertyDescriptor KAFKA_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl-context-service")
.displayName("Kafka SSL Context Service")
.description("Specifies the SSL Context Service to use for communicating with Kafka.")
.displayName("SSL Context Service")
.description("Specifies the SSL Context Service to use for communicating with Atlas and Kafka.")
.required(false)
.identifiesControllerService(SSLContextService.class)
.build();
@ -321,10 +331,20 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
private static final String ATLAS_PROPERTY_METADATA_NAMESPACE = "atlas.metadata.namespace";
private static final String ATLAS_PROPERTY_CLUSTER_NAME = "atlas.cluster.name";
private static final String ATLAS_PROPERTY_REST_ADDRESS = "atlas.rest.address";
private static final String ATLAS_PROPERTY_ENABLE_TLS = "atlas.enableTLS";
private static final String ATLAS_PROPERTY_ENABLE_TLS = SecurityProperties.TLS_ENABLED;
private static final String ATLAS_PROPERTY_TRUSTSTORE_FILE = SecurityProperties.TRUSTSTORE_FILE_KEY;
private static final String ATLAS_PROPERTY_CRED_STORE_PATH = SecurityProperties.CERT_STORES_CREDENTIAL_PROVIDER_PATH;
private static final String ATLAS_KAFKA_PREFIX = "atlas.kafka.";
private static final String ATLAS_PROPERTY_KAFKA_BOOTSTRAP_SERVERS = ATLAS_KAFKA_PREFIX + "bootstrap.servers";
private static final String ATLAS_PROPERTY_KAFKA_CLIENT_ID = ATLAS_KAFKA_PREFIX + ProducerConfig.CLIENT_ID_CONFIG;
private static final String CRED_STORE_FILENAME = "atlas.jceks";
private static final String SSL_CLIENT_XML_FILENAME = SecurityProperties.SSL_CLIENT_PROPERTIES;
private static final String TRUSTSTORE_PASSWORD_ALIAS = "ssl.client.truststore.password";
private static final String KEYSTORE_TYPE_JKS = "JKS";
private final ServiceLoader<NamespaceResolver> namespaceResolverLoader = ServiceLoader.load(NamespaceResolver.class);
private volatile AtlasAuthN atlasAuthN;
private volatile Properties atlasProperties;
@ -360,10 +380,10 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
properties.add(KERBEROS_CREDENTIALS_SERVICE);
properties.add(KERBEROS_PRINCIPAL);
properties.add(KERBEROS_KEYTAB);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(KAFKA_BOOTSTRAP_SERVERS);
properties.add(KAFKA_SECURITY_PROTOCOL);
properties.add(KAFKA_KERBEROS_SERVICE_NAME);
properties.add(KAFKA_SSL_CONTEXT_SERVICE);
properties.add(ATLAS_CONNECT_TIMEOUT);
properties.add(ATLAS_READ_TIMEOUT);
@ -385,90 +405,100 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final Collection<ValidationResult> results = new ArrayList<>();
final boolean isSSLContextServiceSet = context.getProperty(KAFKA_SSL_CONTEXT_SERVICE).isSet();
final ValidationResult.Builder invalidSSLService = new ValidationResult.Builder()
.subject(KAFKA_SSL_CONTEXT_SERVICE.getDisplayName()).valid(false);
final Set<String> schemes = new HashSet<>();
String atlasUrls = context.getProperty(ATLAS_URLS).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(atlasUrls)) {
Arrays.stream(atlasUrls.split(ATLAS_URL_DELIMITER))
.map(String::trim)
.forEach(input -> {
final ValidationResult.Builder builder = new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(input);
try {
new URL(input);
results.add(builder.explanation("Valid URI").valid(true).build());
final URL url = new URL(input);
schemes.add(url.toURI().getScheme());
} catch (Exception e) {
results.add(builder.explanation("Contains invalid URI: " + e).valid(false).build());
results.add(new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName()).input(input)
.explanation("contains invalid URI: " + e).valid(false).build());
}
});
}
if (schemes.size() > 1) {
results.add(new ValidationResult.Builder().subject(ATLAS_URLS.getDisplayName())
.explanation("URLs with multiple schemes have been specified").valid(false).build());
}
final String atlasAuthNMethod = context.getProperty(ATLAS_AUTHN_METHOD).getValue();
final AtlasAuthN atlasAuthN = getAtlasAuthN(atlasAuthNMethod);
results.addAll(atlasAuthN.validate(context));
namespaceResolverLoader.forEach(resolver -> results.addAll(resolver.validate(context)));
synchronized (namespaceResolverLoader) {
// ServiceLoader is not thread-safe and customValidate() may be executed on multiple threads in parallel,
// especially if the component has a property with dynamicallyModifiesClasspath(true)
// and the component gets reloaded due to this when the property has been modified
namespaceResolverLoader.forEach(resolver -> results.addAll(resolver.validate(context)));
}
if (context.getProperty(ATLAS_CONF_CREATE).asBoolean()) {
Stream.of(ATLAS_CONF_DIR, ATLAS_DEFAULT_CLUSTER_NAME, KAFKA_BOOTSTRAP_SERVERS)
Stream.of(ATLAS_URLS, ATLAS_CONF_DIR, ATLAS_DEFAULT_CLUSTER_NAME, KAFKA_BOOTSTRAP_SERVERS)
.filter(p -> !context.getProperty(p).isSet())
.forEach(p -> results.add(new ValidationResult.Builder()
.subject(p.getDisplayName())
.explanation("required to create Atlas configuration file.")
.valid(false).build()));
validateKafkaProperties(context, results, isSSLContextServiceSet, invalidSSLService);
validateKafkaProperties(context, results);
}
return results;
}
private void validateKafkaProperties(ValidationContext context, Collection<ValidationResult> results, boolean isSSLContextServiceSet, ValidationResult.Builder invalidSSLService) {
private void validateKafkaProperties(ValidationContext context, Collection<ValidationResult> results) {
final String kafkaSecurityProtocol = context.getProperty(KAFKA_SECURITY_PROTOCOL).getValue();
if ((SEC_SSL.equals(kafkaSecurityProtocol) || SEC_SASL_SSL.equals(kafkaSecurityProtocol))
&& !isSSLContextServiceSet) {
results.add(invalidSSLService.explanation("required by SSL Kafka connection").build());
}
final String explicitPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
final String explicitKeytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
String principal;
String keytab;
if (credentialsService == null) {
principal = explicitPrincipal;
keytab = explicitKeytab;
} else {
principal = credentialsService.getPrincipal();
keytab = credentialsService.getKeytab();
&& !context.getProperty(SSL_CONTEXT_SERVICE).isSet()) {
results.add(new ValidationResult.Builder()
.subject(SSL_CONTEXT_SERVICE.getDisplayName())
.explanation("required by SSL Kafka connection")
.valid(false)
.build());
}
if (SEC_SASL_PLAINTEXT.equals(kafkaSecurityProtocol) || SEC_SASL_SSL.equals(kafkaSecurityProtocol)) {
if (!context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).isSet()) {
results.add(new ValidationResult.Builder()
.subject(KAFKA_KERBEROS_SERVICE_NAME.getDisplayName())
.explanation("Required by Kafka SASL authentication.")
.valid(false)
.build());
final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
if (credentialsService == null || context.getControllerServiceLookup().isControllerServiceEnabled(credentialsService)) {
String principal;
String keytab;
if (credentialsService == null) {
principal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
keytab = context.getProperty(KERBEROS_KEYTAB).evaluateAttributeExpressions().getValue();
} else {
principal = credentialsService.getPrincipal();
keytab = credentialsService.getKeytab();
}
if (keytab == null || principal == null) {
results.add(new ValidationResult.Builder()
.subject("Kerberos Authentication")
.explanation("Keytab and Principal are required for Kerberos authentication with Apache Kafka.")
.valid(false)
.build());
}
}
if (keytab == null || principal == null) {
if (!context.getProperty(KAFKA_KERBEROS_SERVICE_NAME).isSet()) {
results.add(new ValidationResult.Builder()
.subject("Kerberos Authentication")
.explanation("Keytab and Principal are required for Kerberos authentication with Apache Kafka.")
.valid(false)
.build());
.subject(KAFKA_KERBEROS_SERVICE_NAME.getDisplayName())
.explanation("Required by Kafka SASL authentication.")
.valid(false)
.build());
}
}
}
@OnScheduled
public void setup(ConfigurationContext context) throws IOException {
public void setup(ConfigurationContext context) throws Exception {
// initAtlasClient has to be done first as it loads AtlasProperty.
initAtlasProperties(context);
initLineageStrategy(context);
@ -499,7 +529,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
}
private void initAtlasProperties(ConfigurationContext context) throws IOException {
private void initAtlasProperties(ConfigurationContext context) throws Exception {
final String atlasAuthNMethod = context.getProperty(ATLAS_AUTHN_METHOD).getValue();
final String confDirStr = context.getProperty(ATLAS_CONF_DIR).evaluateAttributeExpressions().getValue();
@ -533,7 +563,6 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
}
List<String> urls = parseAtlasUrls(context.getProperty(ATLAS_URLS));
final boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
setValue(
value -> defaultMetadataNamespace = value,
@ -561,7 +590,8 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
atlasProperties.put(ATLAS_PROPERTY_CLIENT_READ_TIMEOUT_MS, atlasReadTimeoutMs);
atlasProperties.put(ATLAS_PROPERTY_METADATA_NAMESPACE, defaultMetadataNamespace);
atlasProperties.put(ATLAS_PROPERTY_CLUSTER_NAME, defaultMetadataNamespace);
atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, String.valueOf(isAtlasApiSecure));
setAtlasSSLConfig(atlasProperties, context, urls, confDir);
setKafkaConfig(atlasProperties, context);
@ -632,10 +662,37 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
}
}
private void checkAtlasUrls(List<String> urlStrings, ConfigurationContext context) {
if (urlStrings.isEmpty()) {
throw new ProcessException("No Atlas URL has been specified! Set either the '" + ATLAS_URLS.getDisplayName() + "' " +
"property on the processor or the 'atlas.rest.address' porperty in the atlas configuration file.");
private void setAtlasSSLConfig(Properties atlasProperties, ConfigurationContext context, List<String> urls, File confDir) throws Exception {
boolean isAtlasApiSecure = urls.stream().anyMatch(url -> url.toLowerCase().startsWith("https"));
atlasProperties.put(ATLAS_PROPERTY_ENABLE_TLS, String.valueOf(isAtlasApiSecure));
// ssl-client.xml must be deleted, Atlas will not regenerate it otherwise
Path credStorePath = new File(confDir, CRED_STORE_FILENAME).toPath();
Files.deleteIfExists(credStorePath);
Path sslClientXmlPath = new File(confDir, SSL_CLIENT_XML_FILENAME).toPath();
Files.deleteIfExists(sslClientXmlPath);
if (isAtlasApiSecure) {
SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService == null) {
getLogger().warn("No SSLContextService configured, the system default truststore will be used.");
} else if (!sslContextService.isTrustStoreConfigured()) {
getLogger().warn("No truststore configured on SSLContextService, the system default truststore will be used.");
} else if (!KEYSTORE_TYPE_JKS.equalsIgnoreCase(sslContextService.getTrustStoreType())) {
getLogger().warn("The configured truststore type is not supported by Atlas (not JKS), the system default truststore will be used.");
} else {
atlasProperties.put(ATLAS_PROPERTY_TRUSTSTORE_FILE, sslContextService.getTrustStoreFile());
String password = sslContextService.getTrustStorePassword();
// Hadoop Credential Provider JCEKS URI format: localjceks://file/PATH/TO/JCEKS
String credStoreUri = credStorePath.toUri().toString().replaceFirst("^file://", "localjceks://file");
CredentialProvider credentialProvider = new LocalJavaKeyStoreProvider.Factory().createProvider(new URI(credStoreUri), new Configuration());
credentialProvider.createCredentialEntry(TRUSTSTORE_PASSWORD_ALIAS, password.toCharArray());
credentialProvider.flush();
atlasProperties.put(ATLAS_PROPERTY_CRED_STORE_PATH, credStoreUri);
}
}
}
@ -817,7 +874,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
mapToPopulate.put(ATLAS_KAFKA_PREFIX + "security.protocol", kafkaSecurityProtocol);
// Translate SSLContext Service configuration into Kafka properties
final SSLContextService sslContextService = context.getProperty(KAFKA_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
mapToPopulate.put(ATLAS_KAFKA_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
mapToPopulate.put(ATLAS_KAFKA_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());

View File

@ -49,22 +49,24 @@ public class Kerberos implements AtlasAuthN {
final KerberosCredentialsService credentialsService = context.getProperty(ReportLineageToAtlas.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
final String resolvedPrincipal;
final String resolvedKeytab;
if (credentialsService == null) {
resolvedPrincipal = explicitPrincipal;
resolvedKeytab = explicitKeytab;
} else {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
}
if (credentialsService == null || context.getControllerServiceLookup().isControllerServiceEnabled(credentialsService)) {
final String resolvedPrincipal;
final String resolvedKeytab;
if (credentialsService == null) {
resolvedPrincipal = explicitPrincipal;
resolvedKeytab = explicitKeytab;
} else {
resolvedPrincipal = credentialsService.getPrincipal();
resolvedKeytab = credentialsService.getKeytab();
}
if (resolvedPrincipal == null || resolvedKeytab == null) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
.explanation("Both the Principal and the Keytab must be specified when using Kerberos authentication, either via the explicit properties or the Kerberos Credentials Service.")
.build());
if (resolvedPrincipal == null || resolvedKeytab == null) {
problems.add(new ValidationResult.Builder()
.subject("Kerberos Credentials")
.valid(false)
.explanation("Both the Principal and the Keytab must be specified when using Kerberos authentication, either via the explicit properties or the Kerberos Credentials Service.")
.build());
}
}
if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {

View File

@ -37,7 +37,6 @@ import org.apache.nifi.provenance.lineage.EventNode;
import org.apache.nifi.provenance.lineage.LineageEdge;
import org.apache.nifi.provenance.lineage.LineageNode;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.state.MockStateManager;
@ -76,8 +75,8 @@ import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_NIFI_UR
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_PASSWORD;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_URLS;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.ATLAS_USER;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY_COMPLETE_PATH;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY;
import static org.apache.nifi.atlas.reporting.ReportLineageToAtlas.LINEAGE_STRATEGY_COMPLETE_PATH;
import static org.apache.nifi.atlas.reporting.SimpleProvenanceRecord.pr;
import static org.apache.nifi.provenance.ProvenanceEventType.ATTRIBUTES_MODIFIED;
import static org.apache.nifi.provenance.ProvenanceEventType.CREATE;
@ -423,7 +422,7 @@ public class ITReportLineageToAtlas {
}
}
private void test(TestConfiguration tc) throws InitializationException, IOException {
private void test(TestConfiguration tc) throws Exception {
final ReportLineageToAtlas reportingTask = new ReportLineageToAtlas();
final MockComponentLog logger = new MockComponentLog("reporting-task-id", reportingTask);