mirror of https://github.com/apache/nifi.git
NIFI-7026 Add kerberos password property to NiFi HortonworksSchemaRegistry
This commit is contained in:
parent
eef04709b9
commit
8fa855c8c3
5
NOTICE
5
NOTICE
|
@ -112,3 +112,8 @@ This includes derived works from Dropwizard Metrics available under Apache Softw
|
|||
and can be found in
|
||||
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java
|
||||
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java
|
||||
|
||||
This includes derived works from Cloudera Schema Registry available under Apache Software License V2 (https://github.com/hortonworks/registry)
|
||||
Cloudera Schema Registry
|
||||
Copyright 2016-2019 Cloudera, Inc.
|
||||
The code can be found in nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
|
|
@ -19,17 +19,10 @@ package org.apache.nifi.security.krb;
|
|||
import org.apache.commons.lang3.Validate;
|
||||
|
||||
import javax.security.auth.Subject;
|
||||
import javax.security.auth.callback.Callback;
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
import javax.security.auth.callback.NameCallback;
|
||||
import javax.security.auth.callback.PasswordCallback;
|
||||
import javax.security.auth.callback.UnsupportedCallbackException;
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
import javax.security.auth.login.Configuration;
|
||||
import javax.security.auth.login.LoginContext;
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* KerberosUser that authenticates via username and password instead of keytab.
|
||||
|
@ -46,65 +39,9 @@ public class KerberosPasswordUser extends AbstractKerberosUser {
|
|||
|
||||
@Override
|
||||
protected LoginContext createLoginContext(final Subject subject) throws LoginException {
|
||||
final Configuration configuration = new PasswordConfig();
|
||||
final Configuration configuration = new PasswordConfiguration();
|
||||
final CallbackHandler callbackHandler = new UsernamePasswordCallbackHandler(principal, password);
|
||||
return new LoginContext("PasswordConf", subject, callbackHandler, configuration);
|
||||
}
|
||||
|
||||
/**
|
||||
* JAAS Configuration to use when logging in with username/password.
|
||||
*/
|
||||
private static class PasswordConfig extends Configuration {
|
||||
|
||||
@Override
|
||||
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
|
||||
HashMap<String, String> options = new HashMap<String, String>();
|
||||
options.put("storeKey", "true");
|
||||
options.put("refreshKrb5Config", "true");
|
||||
|
||||
final String krbLoginModuleName = ConfigurationUtil.IS_IBM
|
||||
? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE : ConfigurationUtil.SUN_KRB5_LOGIN_MODULE;
|
||||
|
||||
return new AppConfigurationEntry[] {
|
||||
new AppConfigurationEntry(
|
||||
krbLoginModuleName,
|
||||
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
|
||||
options
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* CallbackHandler that provides the given username and password.
|
||||
*/
|
||||
private static class UsernamePasswordCallbackHandler implements CallbackHandler {
|
||||
|
||||
private final String username;
|
||||
private final String password;
|
||||
|
||||
public UsernamePasswordCallbackHandler(final String username, final String password) {
|
||||
this.username = username;
|
||||
this.password = password;
|
||||
Validate.notBlank(this.username);
|
||||
Validate.notBlank(this.password);
|
||||
}
|
||||
|
||||
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
|
||||
for (final Callback callback : callbacks) {
|
||||
if (callback instanceof NameCallback) {
|
||||
final NameCallback nameCallback = (NameCallback) callback;
|
||||
nameCallback.setName(username);
|
||||
} else if (callback instanceof PasswordCallback) {
|
||||
final PasswordCallback passwordCallback = (PasswordCallback) callback;
|
||||
passwordCallback.setPassword(password.toCharArray());
|
||||
} else {
|
||||
throw new IllegalStateException("Unexpected callback type: " + callback.getClass().getCanonicalName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.krb;
|
||||
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
import javax.security.auth.login.Configuration;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* JAAS Configuration to use when logging in with username/password.
|
||||
*/
|
||||
public class PasswordConfiguration extends Configuration {
|
||||
|
||||
@Override
|
||||
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
|
||||
HashMap<String, String> options = new HashMap<String, String>();
|
||||
options.put("storeKey", "true");
|
||||
options.put("refreshKrb5Config", "true");
|
||||
|
||||
final String krbLoginModuleName = ConfigurationUtil.IS_IBM
|
||||
? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE : ConfigurationUtil.SUN_KRB5_LOGIN_MODULE;
|
||||
|
||||
return new AppConfigurationEntry[] {
|
||||
new AppConfigurationEntry(
|
||||
krbLoginModuleName,
|
||||
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
|
||||
options
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.security.krb;
|
||||
|
||||
import org.apache.commons.lang3.Validate;
|
||||
|
||||
import javax.security.auth.callback.Callback;
|
||||
import javax.security.auth.callback.CallbackHandler;
|
||||
import javax.security.auth.callback.NameCallback;
|
||||
import javax.security.auth.callback.PasswordCallback;
|
||||
import javax.security.auth.callback.UnsupportedCallbackException;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* CallbackHandler that provides the given username and password.
|
||||
*/
|
||||
public class UsernamePasswordCallbackHandler implements CallbackHandler {
|
||||
|
||||
private final String username;
|
||||
private final String password;
|
||||
|
||||
public UsernamePasswordCallbackHandler(final String username, final String password) {
|
||||
this.username = username;
|
||||
this.password = password;
|
||||
Validate.notBlank(this.username);
|
||||
Validate.notBlank(this.password);
|
||||
}
|
||||
|
||||
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
|
||||
for (final Callback callback : callbacks) {
|
||||
if (callback instanceof NameCallback) {
|
||||
final NameCallback nameCallback = (NameCallback) callback;
|
||||
nameCallback.setName(username);
|
||||
} else if (callback instanceof PasswordCallback) {
|
||||
final PasswordCallback passwordCallback = (PasswordCallback) callback;
|
||||
passwordCallback.setPassword(password.toCharArray());
|
||||
} else {
|
||||
throw new IllegalStateException("Unexpected callback type: " + callback.getClass().getCanonicalName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -67,120 +67,12 @@ limitations under the License.
|
|||
<version>1.8.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Schema Registry Serdes Jar that also pulls in registry client jars -->
|
||||
<!-- Schema Registry Client-->
|
||||
<dependency>
|
||||
<groupId>com.hortonworks.registries</groupId>
|
||||
<artifactId>schema-registry-client</artifactId>
|
||||
<version>${hwx.registry.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.jboss.logging</groupId>
|
||||
<artifactId>jboss-logging</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.directory.server</groupId>
|
||||
<artifactId>apacheds-i18n</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.directory.server</groupId>
|
||||
<artifactId>apacheds-kerberos-codec</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>javax.servlet.jsp</groupId>
|
||||
<artifactId>jsp-api</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.javassist</groupId>
|
||||
<artifactId>javassist</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.tukaani</groupId>
|
||||
<artifactId>xz</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.thoughtworks.paranamer</groupId>
|
||||
<artifactId>paranamer</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.hibernate</groupId>
|
||||
<artifactId>hibernate-validator</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-annotations</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.fusesource.leveldbjni</groupId>
|
||||
<artifactId>leveldbjni-all</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-auth</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.glassfish.hk2</groupId>
|
||||
<artifactId>osgi-resource-locator</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.github.fge</groupId>
|
||||
<artifactId>btf</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>javax.mail</groupId>
|
||||
<artifactId>mailapi</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.googlecode.libphonenumber</groupId>
|
||||
<artifactId>libphonenumber</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.yaml</groupId>
|
||||
<artifactId>snakeyaml</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.github.fge</groupId>
|
||||
<artifactId>msg-simple</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.glassfish.hk2.external</groupId>
|
||||
<artifactId>aopalliance-repackaged</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>joda-time</groupId>
|
||||
<artifactId>joda-time</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>jackson-dataformat-yaml</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
|
@ -193,33 +85,26 @@ limitations under the License.
|
|||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.mozilla</groupId>
|
||||
<artifactId>rhino</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.github.fge</groupId>
|
||||
<artifactId>uri-template</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.github.fge</groupId>
|
||||
<artifactId>jackson-coreutils</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.github.fge</groupId>
|
||||
<artifactId>json-schema-core</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.github.fge</groupId>
|
||||
<artifactId>json-schema-validator</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<!-- Upgrade jersey to 2.26 -->
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.core</groupId>
|
||||
<artifactId>jersey-client</artifactId>
|
||||
<version>2.26</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.media</groupId>
|
||||
<artifactId>jersey-media-multipart</artifactId>
|
||||
<version>2.26</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.glassfish.jersey.inject</groupId>
|
||||
<artifactId>jersey-hk2</artifactId>
|
||||
<version>2.26</version>
|
||||
</dependency>
|
||||
|
||||
<!-- explicitly pulling in jackson and excluding above to not conflict with transitive test dependencies from nifi-mock -->
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -31,6 +31,8 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
|||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.avro.AvroTypeUtil;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
|
@ -46,6 +48,7 @@ import org.apache.nifi.util.Tuple;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -117,10 +120,66 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
|||
.required(false)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-principal")
|
||||
.displayName("Kerberos Principal")
|
||||
.description("The kerberos principal to authenticate with when not using the kerberos credentials service")
|
||||
.defaultValue(null)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
|
||||
.name("kerberos-password")
|
||||
.displayName("Kerberos Password")
|
||||
.description("The password for the kerberos principal when not using the kerberos credentials service")
|
||||
.defaultValue(null)
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
private volatile boolean usingKerberosWithPassword = false;
|
||||
private volatile SchemaRegistryClient schemaRegistryClient;
|
||||
private volatile boolean initialized;
|
||||
private volatile Map<String, Object> schemaRegistryConfig;
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
|
||||
final List<ValidationResult> results = new ArrayList<>();
|
||||
|
||||
final String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String kerberosPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
|
||||
final KerberosCredentialsService kerberosCredentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)
|
||||
.asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
if (kerberosCredentialsService != null && !StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (!StringUtils.isBlank(kerberosPrincipal) && StringUtils.isBlank(kerberosPassword)) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_PASSWORD.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos password is required when specifying a kerberos principal")
|
||||
.build());
|
||||
}
|
||||
|
||||
if (StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(KERBEROS_PRINCIPAL.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("kerberos principal is required when specifying a kerberos password")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void enable(final ConfigurationContext context) throws InitializationException {
|
||||
|
@ -146,17 +205,26 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
|||
schemaRegistryConfig.put(CLIENT_SSL_PROPERTY_PREFIX, sslProperties);
|
||||
}
|
||||
|
||||
final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
|
||||
final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
|
||||
|
||||
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE)
|
||||
.asControllerService(KerberosCredentialsService.class);
|
||||
|
||||
if (kerberosCredentialsService != null) {
|
||||
final String principal = kerberosCredentialsService.getPrincipal();
|
||||
final String keytab = kerberosCredentialsService.getKeytab();
|
||||
final String jaasConfigString = getJaasConfig(principal, keytab);
|
||||
final String jaasConfigString = getKeytabJaasConfig(principal, keytab);
|
||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SASL_JAAS_CONFIG.name(), jaasConfigString);
|
||||
} else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
|
||||
schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL, kerberosPrincipal);
|
||||
schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD, kerberosPassword);
|
||||
schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER, getLogger());
|
||||
usingKerberosWithPassword = true;
|
||||
}
|
||||
}
|
||||
|
||||
private String getJaasConfig(final String principal, final String keytab) {
|
||||
private String getKeytabJaasConfig(final String principal, final String keytab) {
|
||||
return "com.sun.security.auth.module.Krb5LoginModule required "
|
||||
+ "useTicketCache=false "
|
||||
+ "renewTicket=true "
|
||||
|
@ -205,20 +273,25 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
|||
properties.add(CACHE_EXPIRATION);
|
||||
properties.add(SSL_CONTEXT_SERVICE);
|
||||
properties.add(KERBEROS_CREDENTIALS_SERVICE);
|
||||
properties.add(KERBEROS_PRINCIPAL);
|
||||
properties.add(KERBEROS_PASSWORD);
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
||||
protected synchronized SchemaRegistryClient getClient() {
|
||||
if (!initialized) {
|
||||
schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
|
||||
if (usingKerberosWithPassword) {
|
||||
schemaRegistryClient = new SchemaRegistryClientWithKerberosPassword(schemaRegistryConfig);
|
||||
} else {
|
||||
schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
|
||||
}
|
||||
initialized = true;
|
||||
}
|
||||
|
||||
return schemaRegistryClient;
|
||||
}
|
||||
|
||||
|
||||
private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName, final String branchName)
|
||||
throws org.apache.nifi.schema.access.SchemaNotFoundException {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.hortonworks;
|
||||
|
||||
import com.hortonworks.registries.auth.Login;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.security.krb.KerberosAction;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
|
||||
import javax.security.auth.login.LoginContext;
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Implementation of Schema Registry's Login interface that wraps NiFi's KerberosUser API.
|
||||
*/
|
||||
public class KerberosUserLogin implements Login {
|
||||
|
||||
private final KerberosUser kerberosUser;
|
||||
private final ComponentLog logger;
|
||||
|
||||
public KerberosUserLogin(final KerberosUser kerberosUser, final ComponentLog logger) {
|
||||
this.kerberosUser = kerberosUser;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Map<String, ?> configs, String loginContextName) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public LoginContext login() throws LoginException {
|
||||
kerberosUser.login();
|
||||
|
||||
// the KerberosUser doesn't expose the LoginContext, but SchemaRegistryClient doesn't use
|
||||
// the returned context at all, so we just return null here
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T doAction(PrivilegedAction<T> action) throws LoginException {
|
||||
final PrivilegedExceptionAction<T> wrappedAction = () -> action.run();
|
||||
final KerberosAction<T> kerberosAction = new KerberosAction<T>(kerberosUser, wrappedAction, logger);
|
||||
return kerberosAction.execute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.hortonworks;
|
||||
|
||||
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.security.krb.KerberosPasswordUser;
|
||||
import org.apache.nifi.security.krb.KerberosUser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.security.auth.login.LoginException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Extend the SchemaRegistryClient so we can override the initialization of the security context and use
|
||||
* the KerberosUserLogin implementation that lets us login with a principal/password.
|
||||
*/
|
||||
public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClient {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistryClientWithKerberosPassword.class);
|
||||
|
||||
public static final String SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL = "schema.registry.client.kerberos.principal";
|
||||
public static final String SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD = "schema.registry.client.kerberos.password";
|
||||
public static final String SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER = "schema.registry.client.nifi.component.logger";
|
||||
|
||||
private KerberosUser kerberosUser;
|
||||
|
||||
public SchemaRegistryClientWithKerberosPassword(final Map<String, ?> conf) {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initializeSecurityContext() {
|
||||
final String principal = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL);
|
||||
if (principal == null) {
|
||||
throw new IllegalArgumentException("Failed to login because principal is null");
|
||||
}
|
||||
|
||||
final String password = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD);
|
||||
if (password == null) {
|
||||
throw new IllegalArgumentException("Failed to login because password is null");
|
||||
}
|
||||
|
||||
final Object loggerObject = configuration.getValue(SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER);
|
||||
if (loggerObject == null) {
|
||||
throw new IllegalArgumentException("Failed to login because component logger is required");
|
||||
}
|
||||
|
||||
if (!(loggerObject instanceof ComponentLog)) {
|
||||
throw new IllegalArgumentException("Failed to login because logger object is not a ComponentLog");
|
||||
}
|
||||
|
||||
kerberosUser = new KerberosPasswordUser(principal, password);
|
||||
login = new KerberosUserLogin(kerberosUser, (ComponentLog) loggerObject);
|
||||
|
||||
try {
|
||||
login.login();
|
||||
} catch (LoginException e) {
|
||||
LOGGER.error("Failed to login as principal `{}`", new Object[]{principal}, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
kerberosUser.logout();
|
||||
} catch (Throwable t) {
|
||||
LOGGER.error("Error performing logout of principal during close(): " + t.getMessage(), t);
|
||||
} finally {
|
||||
kerberosUser = null;
|
||||
}
|
||||
|
||||
super.close();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue