NIFI-6576 Added Basic Authentication support to ConfluentSchemaRegistry

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4743.
This closes #4508.
This closes #4224.
This commit is contained in:
exceptionfactory 2021-01-05 16:51:22 -05:00 committed by Pierre Villard
parent 584adc3b91
commit bb178f371b
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
5 changed files with 194 additions and 5 deletions

View File

@ -61,6 +61,12 @@
<artifactId>nifi-web-utils</artifactId> <artifactId>nifi-web-utils</artifactId>
<version>1.13.0-SNAPSHOT</version> <version>1.13.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.13.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<repositories> <repositories>

View File

@ -20,22 +20,26 @@ package org.apache.nifi.confluent.schemaregistry;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType;
import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient; import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient;
import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient; import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient;
import org.apache.nifi.confluent.schemaregistry.client.SchemaRegistryClient; import org.apache.nifi.confluent.schemaregistry.client.SchemaRegistryClient;
@ -112,6 +116,34 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor AUTHENTICATION_TYPE = new PropertyDescriptor.Builder()
.name("authentication-type")
.displayName("Authentication Type")
.description("HTTP Client Authentication Type for Confluent Schema Registry")
.required(false)
.allowableValues(AuthenticationType.values())
.defaultValue(AuthenticationType.NONE.toString())
.build();
static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("username")
.displayName("Username")
.description("Username for authentication to Confluent Schema Registry")
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x39\\x3b-\\x7e\\x80-\\xff]+$")))
.required(false)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString())
.build();
static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("password")
.displayName("Password")
.description("Password for authentication to Confluent Schema Registry")
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile("^[\\x20-\\x7e\\x80-\\xff]+$")))
.required(false)
.dependsOn(AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString())
.sensitive(true)
.build();
private volatile SchemaRegistryClient client; private volatile SchemaRegistryClient client;
@ -123,6 +155,9 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
properties.add(TIMEOUT); properties.add(TIMEOUT);
properties.add(CACHE_SIZE); properties.add(CACHE_SIZE);
properties.add(CACHE_EXPIRATION); properties.add(CACHE_EXPIRATION);
properties.add(AUTHENTICATION_TYPE);
properties.add(USERNAME);
properties.add(PASSWORD);
return properties; return properties;
} }
@ -139,7 +174,9 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED); sslContext = sslContextService.createSSLContext(ClientAuth.REQUIRED);
} }
final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, getLogger()); final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, username, password, getLogger());
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue(); final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue();
@ -149,6 +186,8 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
@Override @Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
final boolean sslContextSet = validationContext.getProperty(SSL_CONTEXT).isSet(); final boolean sslContextSet = validationContext.getProperty(SSL_CONTEXT).isSet();
if (sslContextSet) { if (sslContextSet) {
final List<String> baseUrls = getBaseURLs(validationContext); final List<String> baseUrls = getBaseURLs(validationContext);
@ -157,7 +196,7 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
.collect(Collectors.toList()); .collect(Collectors.toList());
if (!insecure.isEmpty()) { if (!insecure.isEmpty()) {
return Collections.singleton(new ValidationResult.Builder() results.add(new ValidationResult.Builder()
.subject(SCHEMA_REGISTRY_URLS.getDisplayName()) .subject(SCHEMA_REGISTRY_URLS.getDisplayName())
.input(insecure.get(0)) .input(insecure.get(0))
.valid(false) .valid(false)
@ -166,7 +205,30 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
} }
} }
return Collections.emptyList(); final PropertyValue authenticationTypeProperty = validationContext.getProperty(AUTHENTICATION_TYPE);
if (authenticationTypeProperty.isSet()) {
final AuthenticationType authenticationType = AuthenticationType.valueOf(authenticationTypeProperty.getValue());
if (AuthenticationType.BASIC.equals(authenticationType)) {
final String username = validationContext.getProperty(USERNAME).getValue();
if (StringUtils.isBlank(username)) {
results.add(new ValidationResult.Builder()
.subject(USERNAME.getDisplayName())
.valid(false)
.explanation("Username is required for Basic Authentication")
.build());
}
final String password = validationContext.getProperty(PASSWORD).getValue();
if (StringUtils.isBlank(password)) {
results.add(new ValidationResult.Builder()
.subject(PASSWORD.getDisplayName())
.valid(false)
.explanation("Password is required for Basic Authentication")
.build());
}
}
}
return results;
} }
private List<String> getBaseURLs(final PropertyContext context) { private List<String> getBaseURLs(final PropertyContext context) {

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.confluent.schemaregistry.client;
/**
* Client Authentication Type for Confluent Schema Registry
*/
public enum AuthenticationType {
BASIC,
NONE
}

View File

@ -19,6 +19,7 @@ package org.apache.nifi.confluent.schemaregistry.client;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException; import org.apache.avro.SchemaParseException;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -30,6 +31,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client; import javax.ws.rs.client.Client;
@ -68,7 +70,12 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
private static final String SCHEMA_REGISTRY_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json"; private static final String SCHEMA_REGISTRY_CONTENT_TYPE = "application/vnd.schemaregistry.v1+json";
public RestSchemaRegistryClient(final List<String> baseUrls, final int timeoutMillis, final SSLContext sslContext, final ComponentLog logger) { public RestSchemaRegistryClient(final List<String> baseUrls,
final int timeoutMillis,
final SSLContext sslContext,
final String username,
final String password,
final ComponentLog logger) {
this.baseUrls = new ArrayList<>(baseUrls); this.baseUrls = new ArrayList<>(baseUrls);
final ClientConfig clientConfig = new ClientConfig(); final ClientConfig clientConfig = new ClientConfig();
@ -76,6 +83,10 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMillis); clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMillis);
client = WebUtils.createClient(clientConfig, sslContext); client = WebUtils.createClient(clientConfig, sslContext);
if (StringUtils.isNoneBlank(username, password)) {
client.register(HttpAuthenticationFeature.basic(username, password));
}
this.logger = logger; this.logger = logger;
} }

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.confluent.schemaregistry;
import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class ConfluentSchemaRegistryTest {
private static final String SERVICE_ID = ConfluentSchemaRegistry.class.getSimpleName();
private TestRunner runner;
private ConfluentSchemaRegistry registry;
@Before
public void setUp() throws InitializationException {
registry = new ConfluentSchemaRegistry();
final Processor processor = Mockito.mock(Processor.class);
runner = TestRunners.newTestRunner(processor);
runner.addControllerService(SERVICE_ID, registry);
}
@Test
public void testValidateAuthenticationTypeBasicMissingUsernameAndPassword() {
runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString());
runner.assertNotValid(registry);
}
@Test
public void testValidateAuthenticationTypeBasicMissingUsername() {
runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString());
runner.setProperty(registry, ConfluentSchemaRegistry.PASSWORD, String.class.getName());
runner.assertNotValid(registry);
}
@Test
public void testValidateAuthenticationTypeBasicMissingPassword() {
runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString());
runner.setProperty(registry, ConfluentSchemaRegistry.USERNAME, String.class.getSimpleName());
runner.assertNotValid(registry);
}
@Test
public void testValidateAuthenticationTypeNoneValid() {
runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.NONE.toString());
runner.assertValid(registry);
}
@Test
public void testValidateAndEnableDefaultProperties() {
runner.assertValid(registry);
runner.enableControllerService(registry);
}
@Test
public void testValidateAndEnableAuthenticationTypeBasic() {
runner.setProperty(registry, ConfluentSchemaRegistry.AUTHENTICATION_TYPE, AuthenticationType.BASIC.toString());
runner.setProperty(registry, ConfluentSchemaRegistry.USERNAME, String.class.getSimpleName());
runner.setProperty(registry, ConfluentSchemaRegistry.PASSWORD, String.class.getName());
runner.assertValid(registry);
runner.enableControllerService(registry);
}
}