NIFI-10776 Added NONE and PKI AuthorizationSchemes for ElasticSearchClientService

This closes #6662

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Chris Sampson 2022-11-13 20:52:22 +00:00 committed by exceptionfactory
parent a861bab34d
commit d23e50168f
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
7 changed files with 213 additions and 18 deletions

View File

@ -41,6 +41,27 @@ An example using a non-Docker version of Elasticsearch:
`mvn -Pintegration-tests --fail-at-end -Delasticsearch.testcontainers.enabled=false -Delasticsearch.elastic_user.password=s3cret1234 clean install`
## Bash Script Example
Execute the following script from the `nifi-elasticsearch-bundle` directory:
```bash
mvn --fail-at-end -Pcontrib-check clean install
es_versions=(elasticsearch6 elasticsearch7 elasticsearch8)
it_modules=(nifi-elasticsearch-client-service nifi-elasticsearch-restapi-processors)
for v in "${es_versions[@]}"; do
for m in "${it_modules[@]}"; do
pushd "${m}"
if ! mvn -P "integration-tests,${v}" --fail-at-end failsafe:integration-test failsafe:verify; then
echo; echo; echo "Integration Tests failed for ${v} in ${m}, see Maven logs for details"
exit 1
fi
popd
done
done
```
## Modules with Integration Tests (using Testcontainers)
- [Elasticsearch Client Service](nifi-elasticsearch-client-service)
@ -50,6 +71,8 @@ An example using a non-Docker version of Elasticsearch:
Integration Tests with Testcontainers currently only uses the `amd64` Docker Images.
`elasticsearch6` is known to **not** work with `arm64` machines (e.g. Mac M1/M2), but other Elasticsearch images (e.g. 7.x and 8.x) appear to work.
`elasticsearch6` is known to experience some problems with `arm64` machines (e.g. Mac M1/M2),
but other Elasticsearch images (e.g. 7.x and 8.x) appear to work. Settings have been altered for the Elasticsearch
containers in order to try and enable them on different architectures, but there may still be some inconsistencies.
Explicit `arm64` architecture support may be added in future where the Elasticsearch images exist.

View File

@ -19,6 +19,8 @@ package org.apache.nifi.elasticsearch;
import org.apache.nifi.components.DescribedValue;
public enum AuthorizationScheme implements DescribedValue {
NONE("None", "No authorization scheme."),
PKI("PKI", "Mutual TLS with PKI certificate authorization scheme."),
BASIC("Basic", "Basic authorization scheme."),
API_KEY("API Key", "API key authorization scheme.");

View File

@ -58,7 +58,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
.name("authorization-scheme")
.displayName("Authorization Scheme")
.description("Authorization Scheme used for authenticating to Elasticsearch using the HTTP Authorization header.")
.description("Authorization Scheme used for optional authentication to Elasticsearch.")
.allowableValues(AuthorizationScheme.class)
.defaultValue(AuthorizationScheme.BASIC.getValue())
.required(true)

View File

@ -112,33 +112,48 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(validationContext.getProperty(AUTHORIZATION_SCHEME).getValue());
final boolean usernameSet = validationContext.getProperty(USERNAME).isSet();
final boolean passwordSet = validationContext.getProperty(PASSWORD).isSet();
if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
results.add(new ValidationResult.Builder().subject(String.format("%s and %s", USERNAME.getDisplayName(), PASSWORD.getDisplayName()))
.valid(false).explanation(String.format("if '%s' or '%s' is set, both must be set.", USERNAME.getDisplayName(), PASSWORD.getDisplayName())).build());
}
final boolean apiKeyIdSet = validationContext.getProperty(API_KEY_ID).isSet();
final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
if ((apiKeyIdSet && !apiKeySet) || (!apiKeyIdSet && apiKeySet)) {
results.add(new ValidationResult.Builder().subject(String.format("%s and %s", API_KEY.getDisplayName(), API_KEY_ID.getDisplayName()))
.valid(false).explanation(String.format("if '%s' or '%s' is set, both must be set.", API_KEY.getDisplayName(), API_KEY_ID.getDisplayName())).build());
final SSLContextService sslService = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (authorizationScheme == AuthorizationScheme.PKI && (sslService == null || !sslService.isKeyStoreConfigured())) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false)
.explanation(String.format("if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.",
AUTHORIZATION_SCHEME.getDisplayName(), authorizationScheme.getDisplayName(), PROP_SSL_CONTEXT_SERVICE.getDisplayName())
).build()
);
}
if (usernameSet && apiKeyIdSet) {
results.add(new ValidationResult.Builder().subject(String.format("%s and %s", USERNAME.getDisplayName(), API_KEY_ID.getDisplayName()))
.valid(false).explanation(String.format("'%s' and '%s' cannot be used together.", USERNAME.getDisplayName(), API_KEY_ID.getDisplayName())).build());
if (usernameSet && !passwordSet) {
addAuthorizationPropertiesValidationIssue(results, USERNAME, PASSWORD);
} else if (passwordSet && !usernameSet) {
addAuthorizationPropertiesValidationIssue(results, PASSWORD, USERNAME);
}
if (apiKeyIdSet && !apiKeySet) {
addAuthorizationPropertiesValidationIssue(results, API_KEY_ID, API_KEY);
} else if (apiKeySet && !apiKeyIdSet) {
addAuthorizationPropertiesValidationIssue(results, API_KEY, API_KEY_ID);
}
return results;
}
private void addAuthorizationPropertiesValidationIssue(final List<ValidationResult> results, final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
results.add(new ValidationResult.Builder().subject(missingProperty.getName()).valid(false)
.explanation(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName()))
.build()
);
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.elasticsearch.unit;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.AuthorizationScheme;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opentest4j.AssertionFailedError;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
class ElasticSearchClientServiceImplTest {
private TestRunner runner;
private ElasticSearchClientServiceImpl service;
private static final String HOST = "http://localhost:9200";
@BeforeEach
void setUp() throws Exception {
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
service = new ElasticSearchClientServiceImpl();
runner.addControllerService("Client Service", service);
runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service");
runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, HOST);
}
@Test
void testTransitUrl() {
final String index = "test";
final String type = "no-type";
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.NONE.getValue());
runner.assertValid(service);
runner.enableControllerService(service);
assertEquals(String.format("%s/%s/%s", HOST, index, type), service.getTransitUrl(index, type));
assertEquals(String.format("%s/%s", HOST, index), service.getTransitUrl(index, null));
}
@Test
void testValidateBasicAuth() {
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
runner.setProperty(service, ElasticSearchClientService.PASSWORD, "password");
runner.assertValid(service);
runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.USERNAME, ElasticSearchClientService.PASSWORD);
runner.removeProperty(service, ElasticSearchClientService.USERNAME);
runner.assertValid(service);
runner.setProperty(service, ElasticSearchClientService.PASSWORD, "password");
runner.removeProperty(service, ElasticSearchClientService.USERNAME);
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.PASSWORD, ElasticSearchClientService.USERNAME);
}
@Test
void testValidateApiKeyAuth() {
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, "api-key-id");
runner.setProperty(service, ElasticSearchClientService.API_KEY, "api-key");
runner.assertValid(service);
runner.removeProperty(service, ElasticSearchClientService.API_KEY_ID);
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY, ElasticSearchClientService.API_KEY_ID);
runner.removeProperty(service, ElasticSearchClientService.API_KEY);
runner.assertValid(service);
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, "api-key-id");
runner.removeProperty(service, ElasticSearchClientService.API_KEY);
assertAuthorizationPropertyValidationErrorMessage(ElasticSearchClientService.API_KEY_ID, ElasticSearchClientService.API_KEY);
}
@Test
void testValidatePkiAuth() throws InitializationException {
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.PKI.getValue());
final SSLContextService sslService = mock(SSLContextService.class);
when(sslService.getIdentifier()).thenReturn("ssl-context");
runner.addControllerService("ssl-context", sslService);
runner.setProperty(service, ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
when(sslService.isKeyStoreConfigured()).thenReturn(true);
runner.assertValid(service);
verify(sslService, atMostOnce()).isKeyStoreConfigured();
reset(sslService);
when(sslService.isKeyStoreConfigured()).thenReturn(false);
assertPKIAuthorizationValidationErrorMessage();
verify(sslService, atMostOnce()).isKeyStoreConfigured();
reset(sslService);
runner.removeProperty(service, ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
assertPKIAuthorizationValidationErrorMessage();
verify(sslService, atMostOnce()).isKeyStoreConfigured();
reset(sslService);
}
private void assertAuthorizationPropertyValidationErrorMessage(final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
final AssertionFailedError afe = assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
assertTrue(afe.getMessage().contains(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName())));
}
private void assertPKIAuthorizationValidationErrorMessage() {
final AssertionFailedError afe = assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
assertTrue(afe.getMessage().contains(String.format(
"if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.",
ElasticSearchClientService.AUTHORIZATION_SCHEME.getDisplayName(),
AuthorizationScheme.PKI.getDisplayName(),
ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE.getDisplayName()
)));
}
}

View File

@ -53,12 +53,16 @@ import java.util.Map;
import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.4.3"));
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.5.0"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
.withPassword(ELASTIC_USER_PASSWORD)
.withEnv("xpack.security.enabled", "true");
.withEnv("xpack.security.enabled", "true")
// enable API Keys for integration-tests (6.x & 7.x don't enable SSL and therefore API Keys by default, so use a trial license and explicitly enable API Keys)
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.authc.api_key.enabled", "true");
protected static final String CLIENT_SERVICE_NAME = "Client Service";
protected static final String INDEX = "messages";
@ -68,7 +72,12 @@ public abstract class AbstractElasticsearchITBase {
protected static String elasticsearchHost;
protected static void startTestcontainer() {
if (ENABLE_TEST_CONTAINERS) {
ELASTICSEARCH_CONTAINER.start();
if (getElasticMajorVersion() == 6) {
// disable system call filter check to allow Elasticsearch 6 to run on aarch64 machines (e.g. Mac M1/2)
ELASTICSEARCH_CONTAINER.withEnv("bootstrap.system_call_filter", "false").start();
} else {
ELASTICSEARCH_CONTAINER.start();
}
elasticsearchHost = String.format("http://%s", ELASTICSEARCH_CONTAINER.getHttpHostAddress());
} else {
elasticsearchHost = System.getProperty("elasticsearch.endpoint", "http://localhost:9200");
@ -89,6 +98,7 @@ public abstract class AbstractElasticsearchITBase {
@BeforeAll
static void beforeAll() throws IOException {
startTestcontainer();
type = getElasticMajorVersion() == 6 ? "_doc" : "";
System.out.printf("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nIMAGE: %s:%s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",

View File

@ -86,6 +86,7 @@ language governing permissions and limitations under the License. -->
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
<elasticsearch_docker_image>8.5.0</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>