NIFI-8936 Added dynamic http header support to Confluent Schema Registry controller.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #5237.
This commit is contained in:
Knowles Atchison Jr 2021-07-21 14:12:42 -04:00 committed by Pierre Villard
parent 0c96e573a4
commit f78a983bea
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 80 additions and 5 deletions

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
@ -32,6 +33,7 @@ import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -39,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType;
import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient;
import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient;
@ -59,11 +62,15 @@ import org.apache.nifi.ssl.SSLContextService;
@CapabilityDescription("Provides a Schema Registry that interacts with the Confluent Schema Registry so that those Schemas that are stored in the Confluent Schema "
+ "Registry can be used in NiFi. The Confluent Schema Registry has a notion of a \"subject\" for schemas, which is their terminology for a schema name. When a Schema "
+ "is looked up by name by this registry, it will find a Schema in the Confluent Schema Registry with that subject.")
@DynamicProperty(name = "request.header.*", value = "String literal, may not be empty", description = "Properties that begin with 'request.header.' " +
"are populated into a map and passed as http headers in REST requests to the Confluent Schema Registry")
public class ConfluentSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
private static final String REQUEST_HEADER_PREFIX = "request.header.";
static final PropertyDescriptor SCHEMA_REGISTRY_URLS = new PropertyDescriptor.Builder()
.name("url")
@ -158,6 +165,28 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
return properties;
}
private static final Validator REQUEST_HEADER_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
return new ValidationResult.Builder()
.subject(subject)
.input(value)
.valid(subject.startsWith(REQUEST_HEADER_PREFIX)
&& subject.length() > REQUEST_HEADER_PREFIX.length())
.explanation("Dynamic property names must be of format 'request.header.*'")
.build();
}
};
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptionName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptionName)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(REQUEST_HEADER_VALIDATOR)
.build();
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final List<String> baseUrls = getBaseURLs(context);
@ -173,7 +202,20 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, username, password, getLogger());
// generate a map of http headers where the key is the remainder of the property name after
// the request header prefix
final Map<String, String> httpHeaders =
context.getProperties().entrySet()
.stream()
.filter(e -> e.getKey().getName().startsWith(REQUEST_HEADER_PREFIX))
.collect(Collectors.toMap(
map -> map.getKey().getName().substring(REQUEST_HEADER_PREFIX.length()),
Map.Entry::getValue)
);
final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis,
sslContext, username, password, getLogger(), httpHeaders);
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue();

View File

@ -36,6 +36,7 @@ import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -44,6 +45,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
@ -61,6 +63,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
private final List<String> baseUrls;
private final Client client;
private final ComponentLog logger;
private final Map<String, String> httpHeaders;
private static final String SUBJECT_FIELD_NAME = "subject";
private static final String VERSION_FIELD_NAME = "version";
@ -75,8 +78,10 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final SSLContext sslContext,
final String username,
final String password,
final ComponentLog logger) {
final ComponentLog logger,
final Map<String, String> httpHeaders) {
this.baseUrls = new ArrayList<>(baseUrls);
this.httpHeaders = httpHeaders;
final ClientConfig clientConfig = new ClientConfig();
clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMillis);
@ -191,8 +196,12 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
final WebTarget builder = client.target(url);
final Response response = builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString()));
final WebTarget webTarget = client.target(url);
Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE);
for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
builder = builder.header(header.getKey(), header.getValue());
}
final Response response = builder.post(Entity.json(schema.toString()));
final int responseCode = response.getStatus();
if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
@ -216,7 +225,11 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final String url = trimmedBase + path;
final WebTarget webTarget = client.target(url);
final Response response = webTarget.request().accept(MediaType.APPLICATION_JSON).get();
Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON);
for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
builder = builder.header(header.getKey(), header.getValue());
}
final Response response = builder.get();
final int responseCode = response.getStatus();
if (responseCode == Response.Status.OK.getStatusCode()) {

View File

@ -81,4 +81,24 @@ public class ConfluentSchemaRegistryTest {
runner.assertValid(registry);
runner.enableControllerService(registry);
}
@Test
public void testValidateAndEnableDynamicHttpHeaderProperties() {
runner.setProperty(registry, "request.header.User", "kafkaUser");
runner.setProperty(registry, "request.header.Test", "testValue");
runner.assertValid(registry);
runner.enableControllerService(registry);
}
@Test
public void testValidateDynamicHttpHeaderPropertiesMissingTrailingValue() {
runner.setProperty(registry, "request.header.", "NotValid");
runner.assertNotValid(registry);
}
@Test
public void testValidateDynamicHttpHeaderPropertiesInvalidSubject() {
runner.setProperty(registry, "not.valid.subject", "NotValid");
runner.assertNotValid(registry);
}
}