NIFI-10760 Add API Key authentication to ElasticSearchClientServiceImpl

This closes #6619

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2022-11-04 12:50:04 +01:00 committed by exceptionfactory
parent cc35e3e427
commit d374c1f399
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
6 changed files with 236 additions and 14 deletions

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.elasticsearch;
import org.apache.nifi.components.DescribedValue;
public enum AuthorizationScheme implements DescribedValue {
BASIC("Basic", "Basic authorization scheme."),
API_KEY("API Key", "API key authorization scheme.");
private final String displayName;
private final String description;
AuthorizationScheme(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -43,6 +43,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("el-cs-ssl-context-service")
.displayName("SSL Context Service")
@ -53,23 +54,58 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.addValidator(Validator.VALID)
.build();
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(false, ProxySpec.HTTP);
PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
.name("authorization-scheme")
.displayName("Authorization Scheme")
.description("Authorization Scheme used for authenticating to Elasticsearch using the HTTP Authorization header.")
.allowableValues(AuthorizationScheme.class)
.defaultValue(AuthorizationScheme.BASIC.getValue())
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("el-cs-username")
.displayName("Username")
.description("The username to use with XPack security.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue())
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("el-cs-password")
.displayName("Password")
.description("The password to use with XPack security.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue())
.required(false)
.sensitive(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
PropertyDescriptor API_KEY_ID = new PropertyDescriptor.Builder()
.name("api-key-id")
.displayName("API Key ID")
.description("Unique identifier of the API key.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue())
.required(false)
.sensitive(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
PropertyDescriptor API_KEY = new PropertyDescriptor.Builder()
.name("api-key")
.displayName("API Key")
.description("Encoded API key.")
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue())
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
.name("el-cs-connect-timeout")
.displayName("Connect timeout")
@ -78,6 +114,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.defaultValue("5000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
PropertyDescriptor SOCKET_TIMEOUT = new PropertyDescriptor.Builder()
.name("el-cs-socket-timeout")
.displayName("Read timeout")
@ -99,6 +136,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.defaultValue("60000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("el-cs-charset")
.displayName("Charset")

View File

@ -29,11 +29,14 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
@ -60,6 +63,8 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -84,16 +89,19 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ElasticSearchClientService.HTTP_HOSTS);
props.add(ElasticSearchClientService.USERNAME);
props.add(ElasticSearchClientService.PASSWORD);
props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
props.add(ElasticSearchClientService.PROXY_CONFIGURATION_SERVICE);
props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
props.add(ElasticSearchClientService.RETRY_TIMEOUT);
props.add(ElasticSearchClientService.CHARSET);
props.add(ElasticSearchClientService.SUPPRESS_NULLS);
props.add(HTTP_HOSTS);
props.add(AUTHORIZATION_SCHEME);
props.add(USERNAME);
props.add(PASSWORD);
props.add(API_KEY_ID);
props.add(API_KEY);
props.add(PROP_SSL_CONTEXT_SERVICE);
props.add(PROXY_CONFIGURATION_SERVICE);
props.add(CONNECT_TIMEOUT);
props.add(SOCKET_TIMEOUT);
props.add(RETRY_TIMEOUT);
props.add(CHARSET);
props.add(SUPPRESS_NULLS);
properties = Collections.unmodifiableList(props);
}
@ -103,6 +111,34 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return properties;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(1);
final boolean usernameSet = validationContext.getProperty(USERNAME).isSet();
final boolean passwordSet = validationContext.getProperty(PASSWORD).isSet();
if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
results.add(new ValidationResult.Builder().subject(String.format("%s and %s", USERNAME.getDisplayName(), PASSWORD.getDisplayName()))
.valid(false).explanation(String.format("if '%s' or '%s' is set, both must be set.", USERNAME.getDisplayName(), PASSWORD.getDisplayName())).build());
}
final boolean apiKeyIdSet = validationContext.getProperty(API_KEY_ID).isSet();
final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
if ((apiKeyIdSet && !apiKeySet) || (!apiKeyIdSet && apiKeySet)) {
results.add(new ValidationResult.Builder().subject(String.format("%s and %s", API_KEY.getDisplayName(), API_KEY_ID.getDisplayName()))
.valid(false).explanation(String.format("if '%s' or '%s' is set, both must be set.", API_KEY.getDisplayName(), API_KEY_ID.getDisplayName())).build());
}
if (usernameSet && apiKeyIdSet) {
results.add(new ValidationResult.Builder().subject(String.format("%s and %s", USERNAME.getDisplayName(), API_KEY_ID.getDisplayName()))
.valid(false).explanation(String.format("'%s' and '%s' cannot be used together.", USERNAME.getDisplayName(), API_KEY_ID.getDisplayName())).build());
}
return results;
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
@ -206,6 +242,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
final String apiKey = context.getProperty(API_KEY).getValue();
final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
final Integer readTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
@ -237,6 +276,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
credentialsProvider = addCredentials(null, AuthScope.ANY, username, password);
}
if (apiKeyId != null && apiKey != null) {
httpClientBuilder.setDefaultHeaders(Collections.singletonList(createApiKeyAuthorizationHeader(apiKeyId, apiKey)));
}
if (proxyConfigurationService != null) {
final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
@ -275,6 +318,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return cp;
}
private BasicHeader createApiKeyAuthorizationHeader(String apiKeyId, String apiKey) {
final String apiKeyCredentials = String.format("%s:%s", apiKeyId, apiKey);
final String apiKeyAuth = Base64.getEncoder().encodeToString((apiKeyCredentials).getBytes(StandardCharsets.UTF_8));
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
}
private void appendIndex(final StringBuilder sb, final String index) {
if (StringUtils.isNotBlank(index) && !"/".equals(index)) {
if (!index.startsWith("/")) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.elasticsearch.integration;
import org.apache.nifi.elasticsearch.AuthorizationScheme;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
@ -41,6 +42,7 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue());
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD);

View File

@ -18,9 +18,11 @@
package org.apache.nifi.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.elasticsearch.AuthorizationScheme;
import org.apache.nifi.elasticsearch.DeleteOperationResponse;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
@ -41,6 +43,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -66,10 +69,6 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
service.onDisabled();
}
private String prettyJson(final Object o) throws JsonProcessingException {
return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
}
private Map<PropertyDescriptor, String> getClientServiceProperties() {
return ((MockControllerServiceLookup) runner.getProcessContext().getControllerServiceLookup())
.getControllerServices().get(CLIENT_SERVICE_NAME).getProperties();
@ -86,6 +85,27 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
}
@Test
void testVerifySuccessWithApiKeyAuth() throws IOException {
final Pair<String, String> apiKey = createApiKeyForIndex("*");
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
runner.removeProperty(service, ElasticSearchClientService.USERNAME);
runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, apiKey.getKey());
runner.setProperty(service, ElasticSearchClientService.API_KEY, apiKey.getValue());
runner.enableControllerService(service);
final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(3, results.size());
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
}
@Test
void testVerifyFailedURL() {
runner.disableControllerService(service);
@ -157,6 +177,32 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
);
}
@Test
void testVerifyFailedApiKeyAuth() {
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
runner.removeProperty(service, ElasticSearchClientService.USERNAME);
runner.removeProperty(service, ElasticSearchClientService.PASSWORD);
runner.setProperty(service, ElasticSearchClientService.API_KEY_ID, "invalid");
runner.setProperty(service, ElasticSearchClientService.API_KEY, "not-real");
runner.enableControllerService(service);
final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(3, results.size());
assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
&& Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
&& result.getOutcome() == ConfigVerificationResult.Outcome.FAILED).count(),
results.toString()
);
}
@Test
void testBasicSearch() throws Exception {
final Map<String, Object> temp = new MapBuilder()
@ -755,4 +801,5 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
private static void waitForIndexRefresh() throws InterruptedException {
Thread.sleep(1000);
}
}

View File

@ -16,15 +16,20 @@
*/
package org.apache.nifi.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.elasticsearch.MapBuilder;
import org.apache.nifi.util.TestRunner;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.BeforeAll;
@ -34,12 +39,16 @@ import org.testcontainers.utility.DockerImageName;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.http.auth.AuthScope.ANY;
@ -150,6 +159,36 @@ public abstract class AbstractElasticsearchITBase {
}
}
protected String prettyJson(final Object o) throws JsonProcessingException {
return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
}
protected Pair<String, String> createApiKeyForIndex(String index) throws IOException {
final String body = prettyJson(new MapBuilder()
.of("name", "test-api-key")
.of("role_descriptors", new MapBuilder()
.of("test-role", new MapBuilder()
.of("cluster", Collections.singletonList("all"))
.of("index", Collections.singletonList(new MapBuilder()
.of("names", Collections.singletonList(index))
.of("privileges", Collections.singletonList("all"))
.build()))
.build())
.build())
.build());
final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
final Request request = new Request("POST", endpoint);
final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
request.setEntity(jsonBody);
final Response response = testDataManagementClient.performRequest(request);
final InputStream inputStream = response.getEntity().getContent();
final byte[] result = IOUtils.toByteArray(inputStream);
inputStream.close();
final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), Map.class);
return Pair.of(ret.get("id"), ret.get("api_key"));
}
private static List<SetupAction> readSetupActions(final String scriptPath) throws IOException {
final List<SetupAction> actions = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(scriptPath))))) {