mirror of
https://github.com/apache/nifi.git
synced 2025-02-13 13:35:20 +00:00
NIFI-10831 enable JWT realm authentication with Elasticsearch
This commit is contained in:
parent
af5c88bd1d
commit
90cada2204
@ -36,9 +36,8 @@
|
||||
<artifactId>nifi-proxy-configuration-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<scope>compile</scope>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-oauth2-provider-api</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
@ -22,7 +22,8 @@ public enum AuthorizationScheme implements DescribedValue {
|
||||
NONE("None", "No authorization scheme."),
|
||||
PKI("PKI", "Mutual TLS with PKI certificate authorization scheme."),
|
||||
BASIC("Basic", "Basic authorization scheme."),
|
||||
API_KEY("API Key", "API key authorization scheme.");
|
||||
API_KEY("API Key", "API key authorization scheme."),
|
||||
JWT("JWT", "JWT realm scheme.");
|
||||
|
||||
private final String displayName;
|
||||
private final String description;
|
||||
|
@ -22,6 +22,7 @@ import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.controller.ControllerService;
|
||||
import org.apache.nifi.controller.VerifiableControllerService;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
import org.apache.nifi.proxy.ProxySpec;
|
||||
@ -50,6 +51,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
|
||||
.identifiesControllerService(SSLContextProvider.class)
|
||||
.addValidator(Validator.VALID)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(ProxySpec.HTTP);
|
||||
|
||||
PropertyDescriptor AUTHORIZATION_SCHEME = new PropertyDescriptor.Builder()
|
||||
@ -62,6 +64,25 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor OAUTH2_ACCESS_TOKEN_PROVIDER = new PropertyDescriptor.Builder()
|
||||
.name("el-cs-oauth2-token-provider")
|
||||
.displayName("OAuth2 Access Token Provider")
|
||||
.description("The OAuth2 Access Token Provider used to provide JWTs for Bearer Token Authorization with Elasticsearch.")
|
||||
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
|
||||
.required(false)
|
||||
.identifiesControllerService(OAuth2AccessTokenProvider.class)
|
||||
.addValidator(Validator.VALID)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor RUN_AS_USER = new PropertyDescriptor.Builder()
|
||||
.name("el-cs-run-as-user")
|
||||
.displayName("Run As User")
|
||||
.description("The username to impersonate within Elasticsearch.")
|
||||
.required(false)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
|
||||
.name("el-cs-username")
|
||||
.displayName("Username")
|
||||
@ -103,6 +124,16 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor JWT_SHARED_SECRET = new PropertyDescriptor.Builder()
|
||||
.name("jwt-shared-secret")
|
||||
.displayName("JWT Shared Secret")
|
||||
.description("JWT realm Shared Secret.")
|
||||
.dependsOn(AUTHORIZATION_SCHEME, AuthorizationScheme.JWT)
|
||||
.required(false)
|
||||
.sensitive(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
|
||||
PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder()
|
||||
.name("el-cs-connect-timeout")
|
||||
.displayName("Connect timeout")
|
||||
@ -220,7 +251,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
|
||||
.name("el-cs-sniff-failure")
|
||||
.displayName("Sniff on Failure")
|
||||
.description("Enable sniffing on failure, meaning that after each failure the Elasticsearch nodes list gets updated " +
|
||||
"straightaway rather than at the following ordinary sniffing round")
|
||||
"straight away rather than at the following ordinary sniffing round")
|
||||
.dependsOn(SNIFF_CLUSTER_NODES, "true")
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
@ -370,7 +401,7 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
|
||||
/**
|
||||
* Perform a search using the JSON DSL.
|
||||
*
|
||||
* @param query A JSON string reprensenting the query.
|
||||
* @param query A JSON string representing the query.
|
||||
* @param index The index to target. Optional.
|
||||
* @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
|
||||
* @param requestParameters A collection of URL request parameters. Optional.
|
||||
|
@ -47,6 +47,10 @@
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-proxy-configuration-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-oauth2-provider-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-elasticsearch-client-service-api</artifactId>
|
||||
@ -83,23 +87,11 @@
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-compress</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.stephenc.findbugs</groupId>
|
||||
<artifactId>findbugs-annotations</artifactId>
|
||||
<version>1.3.9-1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.opentest4j</groupId>
|
||||
<artifactId>opentest4j</artifactId>
|
||||
|
@ -45,6 +45,7 @@ import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.proxy.ProxyConfiguration;
|
||||
@ -56,6 +57,7 @@ import org.apache.nifi.util.StringUtils;
|
||||
import org.elasticsearch.client.Node;
|
||||
import org.elasticsearch.client.NodeSelector;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
@ -110,6 +112,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
PASSWORD,
|
||||
API_KEY_ID,
|
||||
API_KEY,
|
||||
JWT_SHARED_SECRET,
|
||||
OAUTH2_ACCESS_TOKEN_PROVIDER,
|
||||
RUN_AS_USER,
|
||||
PROP_SSL_CONTEXT_SERVICE,
|
||||
PROXY_CONFIGURATION_SERVICE,
|
||||
CONNECT_TIMEOUT,
|
||||
@ -127,6 +132,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
SNIFFER_FAILURE_DELAY
|
||||
);
|
||||
|
||||
private OAuth2AccessTokenProvider oAuth2AccessTokenProvider;
|
||||
|
||||
private RestClient client;
|
||||
|
||||
private Sniffer sniffer;
|
||||
@ -165,6 +172,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
|
||||
final SSLContextProvider sslContextProvider = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextProvider.class);
|
||||
|
||||
final boolean jwtSharedSecretSet = validationContext.getProperty(JWT_SHARED_SECRET).isSet();
|
||||
final OAuth2AccessTokenProvider oAuth2Provider = validationContext.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
|
||||
|
||||
if (authorizationScheme == AuthorizationScheme.PKI && (sslContextProvider == null)) {
|
||||
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false)
|
||||
.explanation(String.format("if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.",
|
||||
@ -173,6 +183,23 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
);
|
||||
}
|
||||
|
||||
if (authorizationScheme == AuthorizationScheme.JWT) {
|
||||
if (oAuth2Provider == null) {
|
||||
results.add(new ValidationResult.Builder().subject(OAUTH2_ACCESS_TOKEN_PROVIDER.getName()).valid(false)
|
||||
.explanation(String.format("if '%s' is '%s' then '%s' must be set.",
|
||||
AUTHORIZATION_SCHEME.getDisplayName(), authorizationScheme.getDisplayName(), OAUTH2_ACCESS_TOKEN_PROVIDER.getDisplayName())
|
||||
).build()
|
||||
);
|
||||
}
|
||||
if (!jwtSharedSecretSet) {
|
||||
results.add(new ValidationResult.Builder().subject(JWT_SHARED_SECRET.getName()).valid(false)
|
||||
.explanation(String.format("if '%s' is '%s' then '%s' must be set.",
|
||||
AUTHORIZATION_SCHEME.getDisplayName(), authorizationScheme.getDisplayName(), JWT_SHARED_SECRET.getDisplayName())
|
||||
).build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (usernameSet && !passwordSet) {
|
||||
addAuthorizationPropertiesValidationIssue(results, USERNAME, PASSWORD);
|
||||
} else if (passwordSet && !usernameSet) {
|
||||
@ -197,7 +224,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
|
||||
private void addAuthorizationPropertiesValidationIssue(final List<ValidationResult> results, final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
|
||||
results.add(new ValidationResult.Builder().subject(missingProperty.getName()).valid(false)
|
||||
.explanation(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName()))
|
||||
.explanation(String.format("if '%s' is set, then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName()))
|
||||
.build()
|
||||
);
|
||||
}
|
||||
@ -207,11 +234,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
try {
|
||||
this.client = setupClient(context);
|
||||
this.sniffer = setupSniffer(context, this.client);
|
||||
responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||
this.responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
|
||||
|
||||
this.oAuth2AccessTokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
|
||||
|
||||
// re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic
|
||||
createObjectMapper(context);
|
||||
|
||||
} catch (final Exception ex) {
|
||||
getLogger().error("Could not initialize ElasticSearch client.", ex);
|
||||
throw new InitializationException(ex);
|
||||
@ -264,10 +292,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
clientSetupResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
|
||||
|
||||
// try to fetch the Elasticsearch root endpoint (system summary)
|
||||
verifyRootConnection(verifyClient, connectionResult, warningsResult);
|
||||
final OAuth2AccessTokenProvider tokenProvider = context.getProperty(OAUTH2_ACCESS_TOKEN_PROVIDER).asControllerService(OAuth2AccessTokenProvider.class);
|
||||
verifyRootConnection(verifyClient, tokenProvider, connectionResult, warningsResult);
|
||||
|
||||
// try sniffing for cluster nodes
|
||||
verifySniffer(context, verifyClient, snifferResult);
|
||||
verifySniffer(context, verifyClient, tokenProvider, snifferResult);
|
||||
} catch (final MalformedURLException mue) {
|
||||
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
|
||||
.explanation("Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
|
||||
@ -303,7 +332,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
return results;
|
||||
}
|
||||
|
||||
private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
|
||||
private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider, final ConfigVerificationResult.Builder snifferResult) {
|
||||
try (final Sniffer verifySniffer = setupSniffer(context, verifyClient)) {
|
||||
if (verifySniffer != null) {
|
||||
final List<Node> originalNodes = verifyClient.getNodes();
|
||||
@ -317,7 +346,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
nodes.forEach(n -> {
|
||||
try {
|
||||
verifyClient.setNodes(Collections.singletonList(n));
|
||||
final List<String> warnings = getElasticsearchRoot(verifyClient);
|
||||
final List<String> warnings = getElasticsearchRoot(verifyClient, tokenProvider);
|
||||
successfulInstances.getAndIncrement();
|
||||
if (!warnings.isEmpty()) {
|
||||
warningInstances.getAndIncrement();
|
||||
@ -351,17 +380,20 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getElasticsearchRoot(final RestClient verifyClient) throws IOException {
|
||||
final Response response = verifyClient.performRequest(new Request("GET", "/"));
|
||||
private List<String> getElasticsearchRoot(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider) throws IOException {
|
||||
final Request request = addJWTAuthorizationHeader(new Request("GET", "/"), tokenProvider);
|
||||
final Response response = verifyClient.performRequest(request);
|
||||
final List<String> warnings = parseResponseWarningHeaders(response);
|
||||
// ensure the response can be parsed without exception
|
||||
parseResponse(response);
|
||||
|
||||
return warnings;
|
||||
}
|
||||
|
||||
private void verifyRootConnection(final RestClient verifyClient, final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
|
||||
private void verifyRootConnection(final RestClient verifyClient, final OAuth2AccessTokenProvider tokenProvider,
|
||||
final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
|
||||
try {
|
||||
final List<String> warnings = getElasticsearchRoot(verifyClient);
|
||||
final List<String> warnings = getElasticsearchRoot(verifyClient, tokenProvider);
|
||||
|
||||
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
|
||||
if (warnings.isEmpty()) {
|
||||
@ -439,9 +471,13 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
|
||||
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
|
||||
final String runAsUser = context.getProperty(RUN_AS_USER).evaluateAttributeExpressions().getValue();
|
||||
|
||||
final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
|
||||
final String apiKey = context.getProperty(API_KEY).getValue();
|
||||
|
||||
final String jwtSharedSecret = context.getProperty(JWT_SHARED_SECRET).getValue();
|
||||
|
||||
final SSLContext sslContext = getSSLContext(context);
|
||||
final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
|
||||
|
||||
@ -459,6 +495,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
|
||||
defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, apiKey));
|
||||
}
|
||||
if (AuthorizationScheme.JWT == authorizationScheme && jwtSharedSecret != null) {
|
||||
defaultHeaders.add(createSharedSecretHeader(jwtSharedSecret));
|
||||
}
|
||||
if (runAsUser != null) {
|
||||
defaultHeaders.add(createRunAsUserHeader(runAsUser));
|
||||
}
|
||||
if (!defaultHeaders.isEmpty()) {
|
||||
builder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
|
||||
}
|
||||
@ -524,6 +566,23 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
|
||||
}
|
||||
|
||||
private BasicHeader createSharedSecretHeader(final String jwtSharedSecret) {
|
||||
return new BasicHeader("ES-Client-Authentication", "sharedsecret " + jwtSharedSecret);
|
||||
}
|
||||
|
||||
private BasicHeader createRunAsUserHeader(final String runAsUser) {
|
||||
return new BasicHeader("es-security-runas-user", runAsUser);
|
||||
}
|
||||
|
||||
private Request addJWTAuthorizationHeader(final Request request, final OAuth2AccessTokenProvider tokenProvider) {
|
||||
if (tokenProvider != null) {
|
||||
final RequestOptions.Builder requestOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
|
||||
requestOptionsBuilder.addHeader("Authorization", "Bearer " + tokenProvider.getAccessDetails().getAccessToken());
|
||||
request.setOptions(requestOptionsBuilder.build());
|
||||
}
|
||||
return request;
|
||||
}
|
||||
|
||||
private Sniffer setupSniffer(final ConfigurationContext context, final RestClient restClient) {
|
||||
final boolean sniffClusterNodes = context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
|
||||
final int snifferIntervalMillis = context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
@ -1016,7 +1075,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
|
||||
}
|
||||
|
||||
private Response performRequest(final String method, final String endpoint, final Map<String, String> parameters, final HttpEntity entity) throws IOException {
|
||||
final Request request = new Request(method, endpoint);
|
||||
final Request request = addJWTAuthorizationHeader(new Request(method, endpoint), oAuth2AccessTokenProvider);
|
||||
if (parameters != null && !parameters.isEmpty()) {
|
||||
request.addParameters(parameters);
|
||||
}
|
||||
|
@ -209,12 +209,12 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
|
||||
return null;
|
||||
}
|
||||
|
||||
final Map<String, Object> source = (Map<String, Object>) response.getHits().get(0).get("_source");
|
||||
final Map<String, Object> source = (Map<String, Object>) response.getHits().getFirst().get("_source");
|
||||
|
||||
final RecordSchema toUse = getSchema(context, source, null);
|
||||
|
||||
Record record = new MapRecord(toUse, source);
|
||||
if (recordPathMappings.size() > 0) {
|
||||
if (!recordPathMappings.isEmpty()) {
|
||||
record = applyMappings(record, source);
|
||||
}
|
||||
|
||||
@ -239,7 +239,7 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
|
||||
put(e.getKey(), e.getValue());
|
||||
}});
|
||||
}
|
||||
}}).collect(Collectors.toList())
|
||||
}}).toList()
|
||||
);
|
||||
}});
|
||||
}};
|
||||
@ -256,10 +256,10 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
|
||||
if (response.getNumberOfHits() == 0) {
|
||||
return null;
|
||||
} else {
|
||||
final Map<String, Object> source = (Map<String, Object>) response.getHits().get(0).get("_source");
|
||||
final Map<String, Object> source = (Map<String, Object>) response.getHits().getFirst().get("_source");
|
||||
final RecordSchema toUse = getSchema(context, source, null);
|
||||
Record record = new MapRecord(toUse, source);
|
||||
if (recordPathMappings.size() > 0) {
|
||||
if (!recordPathMappings.isEmpty()) {
|
||||
record = applyMappings(record, source);
|
||||
}
|
||||
|
||||
|
@ -263,7 +263,7 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
|
||||
"four", 4, "five", 5)
|
||||
.build();
|
||||
|
||||
buckets.forEach( (aggRes) -> {
|
||||
buckets.forEach(aggRes -> {
|
||||
final String key = (String) aggRes.get("key");
|
||||
final Integer docCount = (Integer) aggRes.get("doc_count");
|
||||
assertEquals(expected.get(key), docCount, String.format("%s did not match.", key));
|
||||
|
@ -49,6 +49,7 @@ class ElasticSearchLookupService_IT extends AbstractElasticsearch_IT {
|
||||
|
||||
private ElasticSearchLookupService lookupService;
|
||||
|
||||
@Override
|
||||
@BeforeEach
|
||||
void before() throws Exception {
|
||||
super.before();
|
||||
|
@ -21,6 +21,7 @@ import org.apache.nifi.elasticsearch.AuthorizationScheme;
|
||||
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
|
||||
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl;
|
||||
import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
|
||||
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.ssl.SSLContextProvider;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
@ -114,9 +115,25 @@ class ElasticSearchClientServiceImplTest {
|
||||
assertPKIAuthorizationValidationErrorMessage();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testValidateJwtAuth() throws InitializationException {
|
||||
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.JWT);
|
||||
runner.setProperty(service, ElasticSearchClientService.JWT_SHARED_SECRET, "jwt-shared-secret");
|
||||
assertJWTAuthorizationValidationErrorMessage(ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER);
|
||||
|
||||
final OAuth2AccessTokenProvider oAuth2AccessTokenProvider = mock(OAuth2AccessTokenProvider.class);
|
||||
when(oAuth2AccessTokenProvider.getIdentifier()).thenReturn("oauth2-access-token-provider");
|
||||
runner.addControllerService("oauth2-access-token-provider", oAuth2AccessTokenProvider);
|
||||
runner.setProperty(service, ElasticSearchClientService.OAUTH2_ACCESS_TOKEN_PROVIDER, "oauth2-access-token-provider");
|
||||
runner.assertValid(service);
|
||||
|
||||
runner.removeProperty(service, ElasticSearchClientService.JWT_SHARED_SECRET);
|
||||
assertJWTAuthorizationValidationErrorMessage(ElasticSearchClientService.JWT_SHARED_SECRET);
|
||||
}
|
||||
|
||||
private void assertAuthorizationPropertyValidationErrorMessage(final PropertyDescriptor presentProperty, final PropertyDescriptor missingProperty) {
|
||||
final AssertionFailedError afe = assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
|
||||
assertTrue(afe.getMessage().contains(String.format("if '%s' is then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName())));
|
||||
assertTrue(afe.getMessage().contains(String.format("if '%s' is set, then '%s' must be set.", presentProperty.getDisplayName(), missingProperty.getDisplayName())));
|
||||
}
|
||||
|
||||
private void assertPKIAuthorizationValidationErrorMessage() {
|
||||
@ -128,4 +145,14 @@ class ElasticSearchClientServiceImplTest {
|
||||
ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE.getDisplayName()
|
||||
)));
|
||||
}
|
||||
|
||||
private void assertJWTAuthorizationValidationErrorMessage(final PropertyDescriptor expectedMissingProperty) {
|
||||
final AssertionFailedError afe = assertThrows(AssertionFailedError.class, () -> runner.assertValid(service));
|
||||
assertTrue(afe.getMessage().contains(String.format(
|
||||
"if '%s' is '%s' then '%s' must be set.",
|
||||
ElasticSearchClientService.AUTHORIZATION_SCHEME.getDisplayName(),
|
||||
AuthorizationScheme.JWT.getDisplayName(),
|
||||
expectedMissingProperty.getDisplayName()
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class ElasticSearchStringLookupServiceTest {
|
||||
class ElasticSearchStringLookupServiceTest {
|
||||
private ElasticSearchClientService mockClientService;
|
||||
private ElasticSearchStringLookupService lookupService;
|
||||
|
||||
@ -56,8 +56,9 @@ public class ElasticSearchStringLookupServiceTest {
|
||||
runner.enableControllerService(lookupService);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void simpleLookupTest() throws Exception {
|
||||
void simpleLookupTest() throws Exception {
|
||||
Map<String, Object> coordinates = new HashMap<>();
|
||||
coordinates.put(ElasticSearchStringLookupService.ID, "12345");
|
||||
|
||||
|
@ -72,8 +72,9 @@ language governing permissions and limitations under the License. -->
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-io</groupId>
|
||||
<artifactId>commons-io</artifactId>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-oauth2-provider-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
|
@ -21,16 +21,6 @@ language governing permissions and limitations under the License. -->
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.docker-java</groupId>
|
||||
<artifactId>docker-java-api</artifactId>
|
||||
|
@ -52,7 +52,7 @@ import static org.apache.http.auth.AuthScope.ANY;
|
||||
public abstract class AbstractElasticsearchITBase {
|
||||
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
|
||||
protected static final DockerImageName IMAGE = DockerImageName
|
||||
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.15.1"));
|
||||
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.17.0"));
|
||||
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
|
||||
private static final int PORT = 9200;
|
||||
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
|
||||
|
@ -119,7 +119,7 @@ language governing permissions and limitations under the License. -->
|
||||
<profile>
|
||||
<id>elasticsearch7</id>
|
||||
<properties>
|
||||
<elasticsearch_docker_image>7.17.23</elasticsearch_docker_image>
|
||||
<elasticsearch_docker_image>7.17.26</elasticsearch_docker_image>
|
||||
</properties>
|
||||
</profile>
|
||||
</profiles>
|
||||
|
Loading…
x
Reference in New Issue
Block a user