mirror of
https://github.com/apache/nifi.git
synced 2025-02-11 12:35:20 +00:00
NIFI-5753 Add SSL support to HortonworksSchemaRegistry service
Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
a273ff10f9
commit
5d65e6aba4
@ -53,6 +53,10 @@ limitations under the License.
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-schema-registry-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
|
@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.apache.nifi.schemaregistry.hortonworks;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.hortonworks.registries.schemaregistry.SchemaMetadata;
|
||||
import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
|
||||
import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
|
||||
@ -39,6 +40,7 @@ import org.apache.nifi.schema.access.SchemaField;
|
||||
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.SchemaIdentifier;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.Tuple;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -61,6 +63,8 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, SchemaField.SCHEMA_TEXT,
|
||||
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
|
||||
|
||||
private static final String CLIENT_SSL_PROPERTY_PREFIX = "schema.registry.client.ssl";
|
||||
|
||||
private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<Tuple<String,String>, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<SchemaVersionKey, Tuple<SchemaVersionInfo, Long>> schemaVersionByKeyCache = new ConcurrentHashMap<>();
|
||||
@ -95,6 +99,13 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||
.defaultValue("1 hour")
|
||||
.required(true)
|
||||
.build();
|
||||
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("ssl-context-service")
|
||||
.displayName("SSL Context Service")
|
||||
.description("Specifies the SSL Context Service to use for communicating with Schema Registry.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.build();
|
||||
|
||||
private volatile SchemaRegistryClient schemaRegistryClient;
|
||||
private volatile boolean initialized;
|
||||
@ -120,9 +131,31 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS));
|
||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), context.getProperty(CACHE_SIZE).asInteger());
|
||||
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS));
|
||||
Map<String, String> sslProperties = buildSslProperties(context);
|
||||
if (!sslProperties.isEmpty()) {
|
||||
schemaRegistryConfig.put(CLIENT_SSL_PROPERTY_PREFIX, sslProperties);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Map<String, String> buildSslProperties(final ConfigurationContext context) {
|
||||
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
|
||||
if (sslContextService != null) {
|
||||
propertiesBuilder.put("protocol", sslContextService.getSslAlgorithm());
|
||||
propertiesBuilder.put("keyPassword", sslContextService.getKeyPassword());
|
||||
if (sslContextService.isKeyStoreConfigured()) {
|
||||
propertiesBuilder.put("keyStorePath", sslContextService.getKeyStoreFile());
|
||||
propertiesBuilder.put("keyStorePassword", sslContextService.getKeyStorePassword());
|
||||
propertiesBuilder.put("keyStoreType", sslContextService.getKeyStoreType());
|
||||
}
|
||||
if (sslContextService.isTrustStoreConfigured()) {
|
||||
propertiesBuilder.put("trustStorePath", sslContextService.getTrustStoreFile());
|
||||
propertiesBuilder.put("trustStorePassword", sslContextService.getTrustStorePassword());
|
||||
propertiesBuilder.put("trustStoreType", sslContextService.getTrustStoreType());
|
||||
}
|
||||
}
|
||||
return propertiesBuilder.build();
|
||||
}
|
||||
|
||||
@OnDisabled
|
||||
public void close() {
|
||||
@ -140,6 +173,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
|
||||
properties.add(URL);
|
||||
properties.add(CACHE_SIZE);
|
||||
properties.add(CACHE_EXPIRATION);
|
||||
properties.add(SSL_CONTEXT_SERVICE);
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user