diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java index ed1b1969d9..65d5b9dc56 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.Set; @@ -32,6 +33,7 @@ import java.util.stream.Stream; import javax.net.ssl.SSLContext; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -39,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType; import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient; import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient; @@ -59,11 +62,15 @@ import org.apache.nifi.ssl.SSLContextService; @CapabilityDescription("Provides a Schema Registry that interacts with the Confluent Schema Registry so that those Schemas that are stored in the Confluent Schema " + "Registry can be used in NiFi. The Confluent Schema Registry has a notion of a \"subject\" for schemas, which is their terminology for a schema name. When a Schema " + "is looked up by name by this registry, it will find a Schema in the Confluent Schema Registry with that subject.") +@DynamicProperty(name = "request.header.*", value = "String literal, may not be empty", description = "Properties that begin with 'request.header.' " + + "are populated into a map and passed as http headers in REST requests to the Confluent Schema Registry") public class ConfluentSchemaRegistry extends AbstractControllerService implements SchemaRegistry { private static final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + private static final String REQUEST_HEADER_PREFIX = "request.header."; + static final PropertyDescriptor SCHEMA_REGISTRY_URLS = new PropertyDescriptor.Builder() .name("url") @@ -158,6 +165,28 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement return properties; } + private static final Validator REQUEST_HEADER_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String value, final ValidationContext context) { + return new ValidationResult.Builder() + .subject(subject) + .input(value) + .valid(subject.startsWith(REQUEST_HEADER_PREFIX) + && subject.length() > REQUEST_HEADER_PREFIX.length()) + .explanation("Dynamic property names must be of format 'request.header.*'") + .build(); + } + }; + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptionName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptionName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(REQUEST_HEADER_VALIDATOR) + .build(); + } + @OnEnabled public void onEnabled(final ConfigurationContext context) { final List baseUrls = getBaseURLs(context); @@ -173,7 +202,20 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement final String username = context.getProperty(USERNAME).getValue(); final String password = context.getProperty(PASSWORD).getValue(); - final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, username, password, getLogger()); + + // generate a map of http headers where the key is the remainder of the property name after + // the request header prefix + final Map httpHeaders = + context.getProperties().entrySet() + .stream() + .filter(e -> e.getKey().getName().startsWith(REQUEST_HEADER_PREFIX)) + .collect(Collectors.toMap( + map -> map.getKey().getName().substring(REQUEST_HEADER_PREFIX.length()), + Map.Entry::getValue) + ); + + final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, + sslContext, username, password, getLogger(), httpHeaders); final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue(); diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java index 6bf2c20d05..318d2d7473 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -36,6 +36,7 @@ import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; import javax.net.ssl.SSLContext; import javax.ws.rs.client.Client; import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -44,6 +45,7 @@ import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.util.ArrayList; import java.util.List; +import java.util.Map; /** @@ -61,6 +63,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { private final List baseUrls; private final Client client; private final ComponentLog logger; + private final Map httpHeaders; private static final String SUBJECT_FIELD_NAME = "subject"; private static final String VERSION_FIELD_NAME = "version"; @@ -75,8 +78,10 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { final SSLContext sslContext, final String username, final String password, - final ComponentLog logger) { + final ComponentLog logger, + final Map httpHeaders) { this.baseUrls = new ArrayList<>(baseUrls); + this.httpHeaders = httpHeaders; final ClientConfig clientConfig = new ClientConfig(); clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMillis); @@ -191,8 +196,12 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { final String path = getPath(pathSuffix); final String trimmedBase = getTrimmedBase(baseUrl); final String url = trimmedBase + path; - final WebTarget builder = client.target(url); - final Response response = builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString())); + final WebTarget webTarget = client.target(url); + Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE); + for (Map.Entry header : httpHeaders.entrySet()) { + builder = builder.header(header.getKey(), header.getValue()); + } + final Response response = builder.post(Entity.json(schema.toString())); final int responseCode = response.getStatus(); if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) { @@ -216,7 +225,11 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { final String url = trimmedBase + path; final WebTarget webTarget = client.target(url); - final Response response = webTarget.request().accept(MediaType.APPLICATION_JSON).get(); + Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON); + for (Map.Entry header : httpHeaders.entrySet()) { + builder = builder.header(header.getKey(), header.getValue()); + } + final Response response = builder.get(); final int responseCode = response.getStatus(); if (responseCode == Response.Status.OK.getStatusCode()) { diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java index 58789a578c..cb7b8ba781 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java @@ -81,4 +81,24 @@ public class ConfluentSchemaRegistryTest { runner.assertValid(registry); runner.enableControllerService(registry); } + + @Test + public void testValidateAndEnableDynamicHttpHeaderProperties() { + runner.setProperty(registry, "request.header.User", "kafkaUser"); + runner.setProperty(registry, "request.header.Test", "testValue"); + runner.assertValid(registry); + runner.enableControllerService(registry); + } + + @Test + public void testValidateDynamicHttpHeaderPropertiesMissingTrailingValue() { + runner.setProperty(registry, "request.header.", "NotValid"); + runner.assertNotValid(registry); + } + + @Test + public void testValidateDynamicHttpHeaderPropertiesInvalidSubject() { + runner.setProperty(registry, "not.valid.subject", "NotValid"); + runner.assertNotValid(registry); + } }