mirror of https://github.com/apache/nifi.git
NIFI-6729 - Created AbstractSingleAttributeBasedControllerServiceLookup and updated DBCPConnectionPoolLookup and AzureStorageCredentialsControllerServiceLookup to inherit from it.
This closes #3774. Signed-off-by: Peter Turcsanyi <turcsanyi@apache.org>
This commit is contained in:
parent
7f96fa1d0d
commit
84a05c8595
|
@ -33,6 +33,11 @@
|
||||||
<artifactId>nifi-utils</artifactId>
|
<artifactId>nifi-utils</artifactId>
|
||||||
<version>1.11.0-SNAPSHOT</version>
|
<version>1.11.0-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-service-utils</artifactId>
|
||||||
|
<version>1.11.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-record-serialization-service-api</artifactId>
|
<artifactId>nifi-record-serialization-service-api</artifactId>
|
||||||
|
|
|
@ -16,26 +16,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.services.azure.storage;
|
package org.apache.nifi.services.azure.storage;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
|
@Tags({ "azure", "microsoft", "cloud", "storage", "blob", "queue", "credentials" })
|
||||||
|
@ -45,97 +30,26 @@ import java.util.Map;
|
||||||
"This will allow multiple AzureStorageCredentialsServices to be defined and registered, and then selected dynamically at runtime by tagging flow files " +
|
"This will allow multiple AzureStorageCredentialsServices to be defined and registered, and then selected dynamically at runtime by tagging flow files " +
|
||||||
"with the appropriate 'azure.storage.credentials.name' attribute.")
|
"with the appropriate 'azure.storage.credentials.name' attribute.")
|
||||||
@DynamicProperty(name = "The name to register AzureStorageCredentialsService", value = "The AzureStorageCredentialsService",
|
@DynamicProperty(name = "The name to register AzureStorageCredentialsService", value = "The AzureStorageCredentialsService",
|
||||||
description = "If 'azure.storage.credentials.name' attribute contains the name of the dynamic property, then the AzureStorageCredentialsService (registered in the value) will be selected.",
|
description = "If '" + AzureStorageCredentialsControllerServiceLookup.AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + "' attribute contains " +
|
||||||
|
"the name of the dynamic property, then the AzureStorageCredentialsService (registered in the value) will be selected.",
|
||||||
expressionLanguageScope = ExpressionLanguageScope.NONE)
|
expressionLanguageScope = ExpressionLanguageScope.NONE)
|
||||||
public class AzureStorageCredentialsControllerServiceLookup extends AbstractControllerService implements AzureStorageCredentialsService {
|
public class AzureStorageCredentialsControllerServiceLookup
|
||||||
|
extends AbstractSingleAttributeBasedControllerServiceLookup<AzureStorageCredentialsService> implements AzureStorageCredentialsService {
|
||||||
|
|
||||||
public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE = "azure.storage.credentials.name";
|
public static final String AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE = "azure.storage.credentials.name";
|
||||||
|
|
||||||
private volatile Map<String, AzureStorageCredentialsService> serviceMap;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
protected String getLookupAttribute() {
|
||||||
return new PropertyDescriptor.Builder()
|
return AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE;
|
||||||
.name(propertyDescriptorName)
|
|
||||||
.description("The " + AzureStorageCredentialsService.class.getSimpleName() + " to return when " + AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " = '" + propertyDescriptorName + "'")
|
|
||||||
.identifiesControllerService(AzureStorageCredentialsService.class)
|
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
public Class<AzureStorageCredentialsService> getServiceType() {
|
||||||
final List<ValidationResult> results = new ArrayList<>();
|
return AzureStorageCredentialsService.class;
|
||||||
|
|
||||||
int numDefinedServices = 0;
|
|
||||||
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
|
||||||
if (descriptor.isDynamic()) {
|
|
||||||
numDefinedServices++;
|
|
||||||
}
|
|
||||||
|
|
||||||
final String referencedId = context.getProperty(descriptor).getValue();
|
|
||||||
if (this.getIdentifier().equals(referencedId)) {
|
|
||||||
results.add(new ValidationResult.Builder()
|
|
||||||
.subject(descriptor.getDisplayName())
|
|
||||||
.explanation("the current service cannot be registered as an " + AzureStorageCredentialsService.class.getSimpleName() + " to lookup")
|
|
||||||
.valid(false)
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numDefinedServices == 0) {
|
|
||||||
results.add(new ValidationResult.Builder()
|
|
||||||
.subject(this.getClass().getSimpleName())
|
|
||||||
.explanation("at least one " + AzureStorageCredentialsService.class.getSimpleName() + " must be defined via dynamic properties")
|
|
||||||
.valid(false)
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnEnabled
|
|
||||||
public void onEnabled(final ConfigurationContext context) {
|
|
||||||
final Map<String, AzureStorageCredentialsService> map = new HashMap<>();
|
|
||||||
|
|
||||||
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
|
||||||
if (descriptor.isDynamic()) {
|
|
||||||
final AzureStorageCredentialsService service = context.getProperty(descriptor).asControllerService(AzureStorageCredentialsService.class);
|
|
||||||
map.put(descriptor.getName(), service);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
serviceMap = Collections.unmodifiableMap(map);
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnDisabled
|
|
||||||
public void onDisabled() {
|
|
||||||
serviceMap = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) throws ProcessException {
|
public AzureStorageCredentialsDetails getStorageCredentialsDetails(Map<String, String> attributes) {
|
||||||
final AzureStorageCredentialsService service = lookupAzureStorageCredentialsService(attributes);
|
return lookupService(attributes).getStorageCredentialsDetails(attributes);
|
||||||
|
|
||||||
return service.getStorageCredentialsDetails(attributes);
|
|
||||||
}
|
|
||||||
|
|
||||||
private AzureStorageCredentialsService lookupAzureStorageCredentialsService(Map<String, String> attributes) {
|
|
||||||
if (!attributes.containsKey(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE)) {
|
|
||||||
throw new ProcessException("Attributes must contain an attribute name '" + AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
final String storageCredentialService = attributes.get(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE);
|
|
||||||
if (StringUtils.isBlank(storageCredentialService)) {
|
|
||||||
throw new ProcessException(AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " cannot be null or blank");
|
|
||||||
}
|
|
||||||
|
|
||||||
final AzureStorageCredentialsService service = serviceMap.get(storageCredentialService);
|
|
||||||
if (service == null) {
|
|
||||||
throw new ProcessException("No " + AzureStorageCredentialsService.class.getSimpleName() + " was found for " +
|
|
||||||
AZURE_STORAGE_CREDENTIALS_NAME_ATTRIBUTE + " '" + storageCredentialService + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
return service;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
<parent>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-extension-utils</artifactId>
|
||||||
|
<version>1.11.0-SNAPSHOT</version>
|
||||||
|
</parent>
|
||||||
|
<artifactId>nifi-service-utils</artifactId>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
<description>
|
||||||
|
This nifi-service-utils module is designed to capture common patterns
|
||||||
|
and utilities that can be leveraged by other services or components to
|
||||||
|
help promote reuse. These patterns may become framework level features
|
||||||
|
or may simply be made available through this utility. It is ok for this
|
||||||
|
module to have dependencies but care should be taken when adding dependencies
|
||||||
|
as this increases the cost of utilizing this module in various nars.
|
||||||
|
</description>
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-api</artifactId>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-utils</artifactId>
|
||||||
|
<version>1.11.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-mock</artifactId>
|
||||||
|
<version>1.11.0-SNAPSHOT</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
</project>
|
|
@ -0,0 +1,162 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions andf
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.service.lookup;
|
||||||
|
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.controller.ControllerService;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A lookup ControllerService that can choose one (from probably multiple) ControllerServices of the given type {@link S}.
|
||||||
|
* <p>
|
||||||
|
* Selection is based on a single {@linkplain String} lookup key.
|
||||||
|
* <p>
|
||||||
|
* Lookup key is provided as a value in an attribute map (usually coming form a flowfile)
|
||||||
|
* with a predefined key (see {@link #getLookupAttribute()}).
|
||||||
|
*
|
||||||
|
* @param <S> The type of service to be looked up
|
||||||
|
*/
|
||||||
|
public abstract class AbstractSingleAttributeBasedControllerServiceLookup<S extends ControllerService> extends AbstractControllerService {
|
||||||
|
protected volatile Map<String, S> serviceMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the Class that represents the type of service that will be returned by {@link #lookupService(Map)}
|
||||||
|
*/
|
||||||
|
public abstract Class<S> getServiceType();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the name of attribute (usually from a flowfile) the value of which serves as the lookup key
|
||||||
|
* for the desired service (of type {@link S})
|
||||||
|
*/
|
||||||
|
protected abstract String getLookupAttribute();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a ControllerService (of type {@link S}) based on the provided attributes map (usually by retrieving
|
||||||
|
* a lookup attribute from it via {@link #getLookupAttribute()} and use it to identify the appropriate service).
|
||||||
|
* @param attributes Map containing the lookup attribute based on which ControllerService is chosen
|
||||||
|
* @return the chosen ControllerService
|
||||||
|
*/
|
||||||
|
public S lookupService(Map<String, String> attributes) {
|
||||||
|
if (attributes == null) {
|
||||||
|
throw new ProcessException("Attributes map is null");
|
||||||
|
} else if (!attributes.containsKey(getLookupAttribute())) {
|
||||||
|
throw new ProcessException("Attributes must contain an attribute name '" + getLookupAttribute() + "'");
|
||||||
|
}
|
||||||
|
|
||||||
|
Object lookupKey = Optional.of(getLookupAttribute())
|
||||||
|
.map(attributes::get)
|
||||||
|
.orElseThrow(() -> new ProcessException(getLookupAttribute() + " cannot be null or blank"));
|
||||||
|
|
||||||
|
S service = serviceMap.get(lookupKey);
|
||||||
|
|
||||||
|
if (service == null) {
|
||||||
|
throw new ProcessException("No " + getServiceName() + " found for " + getLookupAttribute());
|
||||||
|
}
|
||||||
|
|
||||||
|
return service;
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnEnabled
|
||||||
|
public void onEnabled(final ConfigurationContext context) {
|
||||||
|
Map<String, S> serviceMap = new HashMap<>();
|
||||||
|
|
||||||
|
context.getProperties().keySet().stream()
|
||||||
|
.filter(PropertyDescriptor::isDynamic)
|
||||||
|
.forEach(propertyDescriptor -> {
|
||||||
|
S service = context.getProperty(propertyDescriptor).asControllerService(getServiceType());
|
||||||
|
|
||||||
|
serviceMap.put(propertyDescriptor.getName(), service);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.serviceMap = Collections.unmodifiableMap(serviceMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
@OnDisabled
|
||||||
|
public void onDisabled() {
|
||||||
|
serviceMap = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
|
return lookupKeyPropertyDescriptor(propertyDescriptorName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||||
|
return validateForAtLeastOneService(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected PropertyDescriptor lookupKeyPropertyDescriptor(String propertyDescriptorName) {
|
||||||
|
return new PropertyDescriptor.Builder()
|
||||||
|
.name(propertyDescriptorName)
|
||||||
|
.description("The " + getServiceName() + " to return when " + getLookupAttribute() + " = '" + propertyDescriptorName + "'")
|
||||||
|
.identifiesControllerService(getServiceType())
|
||||||
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<ValidationResult> validateForAtLeastOneService(ValidationContext context) {
|
||||||
|
final List<ValidationResult> results = new ArrayList<>();
|
||||||
|
|
||||||
|
int numDefinedServices = 0;
|
||||||
|
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
||||||
|
if (descriptor.isDynamic()) {
|
||||||
|
numDefinedServices++;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String referencedId = context.getProperty(descriptor).getValue();
|
||||||
|
if (this.getIdentifier().equals(referencedId)) {
|
||||||
|
numDefinedServices--;
|
||||||
|
|
||||||
|
results.add(new ValidationResult.Builder()
|
||||||
|
.subject(descriptor.getDisplayName())
|
||||||
|
.explanation("the current service cannot be registered as a " + getServiceName() + " to lookup")
|
||||||
|
.valid(false)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numDefinedServices == 0) {
|
||||||
|
results.add(new ValidationResult.Builder()
|
||||||
|
.subject(this.getClass().getSimpleName())
|
||||||
|
.explanation("at least one " + getServiceName() + " must be defined via dynamic properties")
|
||||||
|
.valid(false)
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getServiceName() {
|
||||||
|
return getServiceType().getSimpleName();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,276 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions andf
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.service.lookup;
|
||||||
|
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.controller.ControllerService;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.util.MockConfigurationContext;
|
||||||
|
import org.apache.nifi.util.MockControllerServiceInitializationContext;
|
||||||
|
import org.apache.nifi.util.MockProcessContext;
|
||||||
|
import org.apache.nifi.util.MockValidationContext;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
|
import static org.hamcrest.CoreMatchers.hasItem;
|
||||||
|
import static org.hamcrest.CoreMatchers.hasItems;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TestAbstractSingleAttributeBasedControllerServiceLookup {
|
||||||
|
private static final String LOOKUP_ATTRIBUTE = "lookupAttribute";
|
||||||
|
private static final String TEST_SUBJECT_IDENTIFIER = "testSubjectIdentifier";
|
||||||
|
private static final Class<ControllerService> SERVICE_TYPE = ControllerService.class;
|
||||||
|
|
||||||
|
private final AbstractSingleAttributeBasedControllerServiceLookup<ControllerService> testSubject = Mockito.spy(AbstractSingleAttributeBasedControllerServiceLookup.class);
|
||||||
|
|
||||||
|
private Map<PropertyDescriptor, String> properties;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
when(testSubject.getLookupAttribute()).thenReturn(LOOKUP_ATTRIBUTE);
|
||||||
|
when(testSubject.getServiceType()).thenReturn(SERVICE_TYPE);
|
||||||
|
when(testSubject.getIdentifier()).thenReturn(TEST_SUBJECT_IDENTIFIER);
|
||||||
|
|
||||||
|
properties = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = Exception.class)
|
||||||
|
public void testLookupShouldThrowExceptionWhenQueriedServiceMappedInPropertiesButWasntCreated() {
|
||||||
|
// GIVEN
|
||||||
|
String mappedCreatedServiceID = "mappedCreatedServiceID";
|
||||||
|
String mappedNotCreatedServiceID = "mappedNotCreatedServiceID";
|
||||||
|
|
||||||
|
ControllerService mappedCreatedService = mock(SERVICE_TYPE);
|
||||||
|
|
||||||
|
MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService, mappedCreatedServiceID);
|
||||||
|
|
||||||
|
String dynamicProperty1 = "property1";
|
||||||
|
String dynamicProperty2 = "property2";
|
||||||
|
|
||||||
|
mapService(dynamicProperty1, mappedCreatedServiceID);
|
||||||
|
mapService(dynamicProperty2, mappedNotCreatedServiceID);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLookupShouldThrowExceptionWhenAttributeMapIsNull() {
|
||||||
|
// GIVEN
|
||||||
|
String mappedCreatedServiceID = "mappedCreatedServiceID";
|
||||||
|
ControllerService mappedCreatedService = mock(SERVICE_TYPE);
|
||||||
|
MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService, mappedCreatedServiceID);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
|
||||||
|
try {
|
||||||
|
testSubject.lookupService(null);
|
||||||
|
fail();
|
||||||
|
} catch (ProcessException e) {
|
||||||
|
assertEquals("Attributes map is null", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLookupShouldThrowExceptionWhenAttributeMapHasNoLookupAttribute() {
|
||||||
|
// GIVEN
|
||||||
|
String mappedCreatedServiceID = "mappedCreatedServiceID";
|
||||||
|
ControllerService mappedCreatedService = mock(SERVICE_TYPE);
|
||||||
|
MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService, mappedCreatedServiceID);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
|
||||||
|
try {
|
||||||
|
testSubject.lookupService(new HashMap<>());
|
||||||
|
fail();
|
||||||
|
} catch (ProcessException e) {
|
||||||
|
assertEquals("Attributes must contain an attribute name '" + LOOKUP_ATTRIBUTE + "'", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLookupShouldThrowExceptionWhenQueriedServiceWasCreatedButWasntMappedInProperties() {
|
||||||
|
// GIVEN
|
||||||
|
String mappedCreatedServiceID = "mappedCreatedServiceID";
|
||||||
|
String notMappedCreatedServiceID = "notMappedCreatedServiceID";
|
||||||
|
|
||||||
|
ControllerService mappedCreatedService = mock(SERVICE_TYPE);
|
||||||
|
ControllerService notMappedCreatedService = mock(SERVICE_TYPE);
|
||||||
|
|
||||||
|
MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService, mappedCreatedServiceID);
|
||||||
|
serviceLookup.addControllerService(notMappedCreatedService, notMappedCreatedServiceID);
|
||||||
|
|
||||||
|
String dynamicProperty1 = "property1";
|
||||||
|
String dynamicProperty2 = "property2";
|
||||||
|
|
||||||
|
mapService(dynamicProperty1, mappedCreatedServiceID);
|
||||||
|
|
||||||
|
String lookupServiceKey = dynamicProperty2;
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
|
||||||
|
try {
|
||||||
|
testSubject.lookupService(createAttributes(lookupServiceKey));
|
||||||
|
fail();
|
||||||
|
} catch (ProcessException e) {
|
||||||
|
assertEquals("No ControllerService found for lookupAttribute", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLookupShouldReturnQueriedService() {
|
||||||
|
// GIVEN
|
||||||
|
String mappedCreatedServiceID1 = "mappedCreatedServiceID1";
|
||||||
|
String mappedCreatedServiceID2 = "mappedCreatedServiceID2";
|
||||||
|
|
||||||
|
ControllerService mappedCreatedService1 = mock(SERVICE_TYPE);
|
||||||
|
ControllerService mappedCreatedService2 = mock(SERVICE_TYPE);
|
||||||
|
|
||||||
|
MockControllerServiceInitializationContext serviceLookup = new MockControllerServiceInitializationContext(mappedCreatedService1, mappedCreatedServiceID1);
|
||||||
|
serviceLookup.addControllerService(mappedCreatedService2, mappedCreatedServiceID2);
|
||||||
|
|
||||||
|
String dynamicProperty1 = "property1";
|
||||||
|
String dynamicProperty2 = "property2";
|
||||||
|
|
||||||
|
mapService(dynamicProperty1, mappedCreatedServiceID1);
|
||||||
|
mapService(dynamicProperty2, mappedCreatedServiceID2);
|
||||||
|
|
||||||
|
String lookupServiceKey = dynamicProperty2;
|
||||||
|
ControllerService expected = mappedCreatedService2;
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
testSubject.onEnabled(new MockConfigurationContext(properties, serviceLookup));
|
||||||
|
ControllerService actual = testSubject.lookupService(createAttributes(lookupServiceKey));
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertEquals(expected, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidateShouldReturnErrorWhenNoServiceIsDefined() {
|
||||||
|
// GIVEN
|
||||||
|
ValidationContext context = new MockValidationContext(new MockProcessContext(testSubject));
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
Collection<ValidationResult> actual = testSubject.customValidate(context);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(
|
||||||
|
actual.stream().map(ValidationResult::getExplanation).collect(Collectors.toList()),
|
||||||
|
hasItem(containsString("at least one " + SERVICE_TYPE.getSimpleName() + " must be defined via dynamic properties"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidateShouldReturnErrorWhenSelfAndOtherServiceIsMapped() {
|
||||||
|
MockProcessContext processContext = new MockProcessContext(testSubject);
|
||||||
|
processContext.setProperty("property1", "service1");
|
||||||
|
processContext.setProperty("property2", TEST_SUBJECT_IDENTIFIER);
|
||||||
|
|
||||||
|
ValidationContext context = new MockValidationContext(processContext);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
Collection<ValidationResult> actual = testSubject.customValidate(context);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(
|
||||||
|
actual.stream().map(ValidationResult::getExplanation).collect(Collectors.toList()),
|
||||||
|
hasItem(containsString("the current service cannot be registered as a " + SERVICE_TYPE.getSimpleName() + " to lookup"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidateShouldReturnErrorsWhenOnlySelfIsMapped() {
|
||||||
|
MockProcessContext processContext = new MockProcessContext(testSubject);
|
||||||
|
processContext.setProperty("property1", TEST_SUBJECT_IDENTIFIER);
|
||||||
|
|
||||||
|
ValidationContext context = new MockValidationContext(processContext);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
Collection<ValidationResult> actual = testSubject.customValidate(context);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertThat(
|
||||||
|
actual.stream().map(ValidationResult::getExplanation).collect(Collectors.toList()),
|
||||||
|
hasItems(
|
||||||
|
containsString("the current service cannot be registered as a " + SERVICE_TYPE.getSimpleName() + " to lookup"),
|
||||||
|
containsString("at least one " + SERVICE_TYPE.getSimpleName() + " must be defined via dynamic properties")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidateShouldReturnNoErrorWhenAServiceIsDefined() {
|
||||||
|
MockProcessContext processContext = new MockProcessContext(testSubject);
|
||||||
|
processContext.setProperty("property1", "service1");
|
||||||
|
|
||||||
|
ValidationContext context = new MockValidationContext(processContext);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
Collection<ValidationResult> actual = testSubject.customValidate(context);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
assertEquals(Collections.emptyList(), new ArrayList<>(actual));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetServiceType() {
|
||||||
|
Class<ControllerService> actual = testSubject.getServiceType();
|
||||||
|
assertEquals(SERVICE_TYPE, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLookupAttribute() {
|
||||||
|
String actual = testSubject.getLookupAttribute();
|
||||||
|
assertEquals(LOOKUP_ATTRIBUTE, actual);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mapService(String dynamicProperty, String registeredService) {
|
||||||
|
properties.put(
|
||||||
|
new PropertyDescriptor.Builder()
|
||||||
|
.name(dynamicProperty)
|
||||||
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.dynamic(true)
|
||||||
|
.build(),
|
||||||
|
registeredService
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, String> createAttributes(final String lookupValue) {
|
||||||
|
Map<String, String> attributes = new HashMap<String, String>() {{
|
||||||
|
put(LOOKUP_ATTRIBUTE, lookupValue);
|
||||||
|
}};
|
||||||
|
|
||||||
|
return attributes;
|
||||||
|
}
|
||||||
|
}
|
|
@ -34,6 +34,7 @@
|
||||||
<module>nifi-syslog-utils</module>
|
<module>nifi-syslog-utils</module>
|
||||||
<module>nifi-database-utils</module>
|
<module>nifi-database-utils</module>
|
||||||
<module>nifi-database-test-utils</module>
|
<module>nifi-database-test-utils</module>
|
||||||
|
<module>nifi-service-utils</module>
|
||||||
</modules>
|
</modules>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -37,6 +37,11 @@
|
||||||
<artifactId>nifi-utils</artifactId>
|
<artifactId>nifi-utils</artifactId>
|
||||||
<version>1.11.0-SNAPSHOT</version>
|
<version>1.11.0-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.nifi</groupId>
|
||||||
|
<artifactId>nifi-service-utils</artifactId>
|
||||||
|
<version>1.11.0-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.nifi</groupId>
|
<groupId>org.apache.nifi</groupId>
|
||||||
<artifactId>nifi-security-utils</artifactId>
|
<artifactId>nifi-security-utils</artifactId>
|
||||||
|
|
|
@ -16,27 +16,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.dbcp;
|
package org.apache.nifi.dbcp;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
|
||||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.service.lookup.AbstractSingleAttributeBasedControllerServiceLookup;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
|
@Tags({ "dbcp", "jdbc", "database", "connection", "pooling", "store" })
|
||||||
|
@ -45,72 +33,23 @@ import java.util.Map;
|
||||||
"if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " +
|
"if the attribute is missing. The value of 'database.name' will be used to select the DBCPService that has been " +
|
||||||
"registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " +
|
"registered with that name. This will allow multiple DBCPServices to be defined and registered, and then selected " +
|
||||||
"dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.")
|
"dynamically at runtime by tagging flow files with the appropriate 'database.name' attribute.")
|
||||||
@DynamicProperty(name = "The ", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.NONE,
|
@DynamicProperty(name = "The name to register DBCPService", value = "The DBCPService",
|
||||||
description = "")
|
description = "If '"+ DBCPConnectionPoolLookup.DATABASE_NAME_ATTRIBUTE +"' attribute contains " +
|
||||||
public class DBCPConnectionPoolLookup extends AbstractControllerService implements DBCPService {
|
"the name of the dynamic property, then the DBCPService (registered in the value) will be selected.",
|
||||||
|
expressionLanguageScope = ExpressionLanguageScope.NONE)
|
||||||
|
public class DBCPConnectionPoolLookup
|
||||||
|
extends AbstractSingleAttributeBasedControllerServiceLookup<DBCPService> implements DBCPService {
|
||||||
|
|
||||||
public static final String DATABASE_NAME_ATTRIBUTE = "database.name";
|
public static final String DATABASE_NAME_ATTRIBUTE = "database.name";
|
||||||
|
|
||||||
private volatile Map<String,DBCPService> dbcpServiceMap;
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
protected String getLookupAttribute() {
|
||||||
return new PropertyDescriptor.Builder()
|
return DATABASE_NAME_ATTRIBUTE;
|
||||||
.name(propertyDescriptorName)
|
|
||||||
.description("The DBCPService to return when database.name = '" + propertyDescriptorName + "'")
|
|
||||||
.identifiesControllerService(DBCPService.class)
|
|
||||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
public Class<DBCPService> getServiceType() {
|
||||||
final List<ValidationResult> results = new ArrayList<>();
|
return DBCPService.class;
|
||||||
|
|
||||||
int numDefinedServices = 0;
|
|
||||||
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
|
||||||
if (descriptor.isDynamic()) {
|
|
||||||
numDefinedServices++;
|
|
||||||
}
|
|
||||||
|
|
||||||
final String referencedId = context.getProperty(descriptor).getValue();
|
|
||||||
if (this.getIdentifier().equals(referencedId)) {
|
|
||||||
results.add(new ValidationResult.Builder()
|
|
||||||
.subject(descriptor.getDisplayName())
|
|
||||||
.explanation("the current service cannot be registered as a DBCPService to lookup")
|
|
||||||
.valid(false)
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (numDefinedServices == 0) {
|
|
||||||
results.add(new ValidationResult.Builder()
|
|
||||||
.subject(this.getClass().getSimpleName())
|
|
||||||
.explanation("at least one DBCPService must be defined via dynamic properties")
|
|
||||||
.valid(false)
|
|
||||||
.build());
|
|
||||||
}
|
|
||||||
|
|
||||||
return results;
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnEnabled
|
|
||||||
public void onEnabled(final ConfigurationContext context) {
|
|
||||||
final Map<String,DBCPService> serviceMap = new HashMap<>();
|
|
||||||
|
|
||||||
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
|
|
||||||
if (descriptor.isDynamic()) {
|
|
||||||
final DBCPService dbcpService = context.getProperty(descriptor).asControllerService(DBCPService.class);
|
|
||||||
serviceMap.put(descriptor.getName(), dbcpService);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
dbcpServiceMap = Collections.unmodifiableMap(serviceMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnDisabled
|
|
||||||
public void onDisabled() {
|
|
||||||
dbcpServiceMap = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -119,22 +58,7 @@ public class DBCPConnectionPoolLookup extends AbstractControllerService implemen
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Connection getConnection(Map<String, String> attributes) throws ProcessException {
|
public Connection getConnection(Map<String, String> attributes) {
|
||||||
if (!attributes.containsKey(DATABASE_NAME_ATTRIBUTE)) {
|
return lookupService(attributes).getConnection(attributes);
|
||||||
throw new ProcessException("Attributes must contain an attribute name '" + DATABASE_NAME_ATTRIBUTE + "'");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final String databaseName = attributes.get(DATABASE_NAME_ATTRIBUTE);
|
|
||||||
if (StringUtils.isBlank(databaseName)) {
|
|
||||||
throw new ProcessException(DATABASE_NAME_ATTRIBUTE + " cannot be null or blank");
|
|
||||||
}
|
|
||||||
|
|
||||||
final DBCPService dbcpService = dbcpServiceMap.get(databaseName);
|
|
||||||
if (dbcpService == null) {
|
|
||||||
throw new ProcessException("No DBCPService was found for database.name '" + databaseName + "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
return dbcpService.getConnection(attributes);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue