NIFI-10797 add customisable Elasticsearch REST Client config and Elasticsearch Cluster Sniffer

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #6658.
This commit is contained in:
Chris Sampson 2022-11-11 15:12:05 +00:00 committed by Joe Gresock
parent 1156f4cbc5
commit 6542505a50
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
11 changed files with 657 additions and 151 deletions

View File

@ -16,8 +16,6 @@
*/
package org.apache.nifi.elasticsearch;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
@ -32,8 +30,6 @@ import org.apache.nifi.ssl.SSLContextService;
import java.util.List;
import java.util.Map;
@Tags({"elasticsearch", "client"})
@CapabilityDescription("A controller service for accessing an Elasticsearch client.")
public interface ElasticSearchClientService extends ControllerService, VerifiableControllerService {
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts")
@ -148,6 +144,118 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.required(true)
.build();
PropertyDescriptor COMPRESSION = new PropertyDescriptor.Builder()
.name("el-cs-enable-compression")
.displayName("Enable Compression")
.description("Whether the REST client should compress requests using gzip content encoding and add the " +
"\"Accept-Encoding: gzip\" header to receive compressed responses")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
PropertyDescriptor SEND_META_HEADER = new PropertyDescriptor.Builder()
.name("el-cs-send-meta-header")
.displayName("Send Meta Header")
.description("Whether to send a \"X-Elastic-Client-Meta\" header that describes the runtime environment. " +
"It contains information that is similar to what could be found in User-Agent. " +
"Using a separate header allows applications to use User-Agent for their own needs, " +
"e.g. to identify application version or other environment information")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
PropertyDescriptor STRICT_DEPRECATION = new PropertyDescriptor.Builder()
.name("el-cs-strict-deprecation")
.displayName("Strict Deprecation")
.description("Whether the REST client should return any response containing at least one warning header as a failure")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
AllowableValue NODE_SELECTOR_ANY = new AllowableValue("ANY", "Any",
"Select any Elasticsearch node to handle requests");
AllowableValue NODE_SELECTOR_SKIP_DEDICATED_MASTERS = new AllowableValue("SKIP_DEDICATED_MASTERS", "Skip Dedicated Masters",
"Skip dedicated Elasticsearch master nodes for handling request");
PropertyDescriptor NODE_SELECTOR = new PropertyDescriptor.Builder()
.name("el-cs-node-selector")
.displayName("Node Selector")
.description("Selects Elasticsearch nodes that can receive requests. Used to keep requests away from dedicated Elasticsearch master nodes")
.allowableValues(NODE_SELECTOR_ANY, NODE_SELECTOR_SKIP_DEDICATED_MASTERS)
.defaultValue(NODE_SELECTOR_ANY.getValue())
.required(true)
.build();
PropertyDescriptor PATH_PREFIX = new PropertyDescriptor.Builder()
.name("el-cs-path-prefix")
.displayName("Path Prefix")
.description("Sets the path's prefix for every request used by the http client. " +
"For example, if this is set to \"/my/path\", then any client request will become \"/my/path/\" + endpoint. " +
"In essence, every request's endpoint is prefixed by this pathPrefix. " +
"The path prefix is useful for when Elasticsearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; " +
"it is not intended for other purposes and it should not be supplied in other scenarios")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
PropertyDescriptor SNIFF_CLUSTER_NODES = new PropertyDescriptor.Builder()
.name("el-cs-sniff-cluster-nodes")
.displayName("Sniff Cluster Nodes")
.description("Periodically sniff for nodes within the Elasticsearch cluster via the Elasticsearch Node Info API. " +
"If Elasticsearch security features are enabled (default to \"true\" for 8.x+), the Elasticsearch user must " +
"have the \"monitor\" or \"manage\" cluster privilege to use this API." +
"Note that all " + HTTP_HOSTS.getDisplayName() + " (and those that may be discovered within the cluster " +
"using the Sniffer) must use the same protocol, e.g. http or https, and be contactable using the same client settings. " +
"Finally the Elasticsearch \"network.publish_host\" must match one of the \"network.bind_host\" list entries " +
"see https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html for more information")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
PropertyDescriptor SNIFF_ON_FAILURE = new PropertyDescriptor.Builder()
.name("el-cs-sniff-failure")
.displayName("Sniff on Failure")
.description("Enable sniffing on failure, meaning that after each failure the Elasticsearch nodes list gets updated " +
"straightaway rather than at the following ordinary sniffing round")
.dependsOn(SNIFF_CLUSTER_NODES, "true")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
PropertyDescriptor SNIFFER_INTERVAL = new PropertyDescriptor.Builder()
.name("el-cs-sniffer-interval")
.displayName("Sniffer Interval")
.description("Interval between Cluster sniffer operations")
.dependsOn(SNIFF_CLUSTER_NODES, "true")
.defaultValue("5 mins")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.required(true)
.build();
PropertyDescriptor SNIFFER_REQUEST_TIMEOUT = new PropertyDescriptor.Builder()
.name("el-cs-sniffer-request-timeout")
.displayName("Sniffer Request Timeout")
.description("Cluster sniffer timeout for node info requests")
.dependsOn(SNIFF_CLUSTER_NODES, "true")
.defaultValue("1 sec")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.required(true)
.build();
PropertyDescriptor SNIFFER_FAILURE_DELAY = new PropertyDescriptor.Builder()
.name("el-cs-sniffer-failure-delay")
.displayName("Sniffer Failure Delay")
.description("Delay between an Elasticsearch request failure and updating available Cluster nodes using the Sniffer")
.dependsOn(SNIFF_ON_FAILURE, "true")
.defaultValue("1 min")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.required(true)
.build();
/**
* Index a document.
*

View File

@ -132,6 +132,10 @@
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>

View File

@ -32,6 +32,9 @@ import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
@ -40,19 +43,26 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
import org.elasticsearch.client.sniff.SniffOnFailureListener;
import org.elasticsearch.client.sniff.Sniffer;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
@ -73,12 +83,23 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@Tags({"elasticsearch", "elasticsearch6", "elasticsearch7", "elasticsearch8", "client"})
@CapabilityDescription("A controller service for accessing an Elasticsearch client. " +
"Uses the Elasticsearch REST Client (7.13.4, the last version before client connections verify" +
"the server is Elastic provided, this should allow for connections to compatible alternatives, e.g. AWS OpenSearch)")
@DynamicProperty(
name = "The name of a Request Header to add",
value = "The value of the Header",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
description = "Adds the specified property name/value as a Request Header in the Elasticsearch requests.")
public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
public static final String VERIFICATION_STEP_CONNECTION = "Elasticsearch Connection";
public static final String VERIFICATION_STEP_CLIENT_SETUP = "Elasticsearch Rest Client Setup";
public static final String VERIFICATION_STEP_WARNINGS = "Elasticsearch Warnings";
public static final String VERIFICATION_STEP_SNIFFER = "Elasticsearch Sniffer";
private ObjectMapper mapper;
@ -86,6 +107,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private RestClient client;
private Sniffer sniffer;
private String url;
private Charset responseCharset;
private ObjectWriter prettyPrintWriter;
@ -93,6 +116,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
static {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(HTTP_HOSTS);
props.add(PATH_PREFIX);
props.add(AUTHORIZATION_SCHEME);
props.add(USERNAME);
props.add(PASSWORD);
@ -104,6 +128,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
props.add(SOCKET_TIMEOUT);
props.add(CHARSET);
props.add(SUPPRESS_NULLS);
props.add(COMPRESSION);
props.add(SEND_META_HEADER);
props.add(STRICT_DEPRECATION);
props.add(NODE_SELECTOR);
props.add(SNIFF_CLUSTER_NODES);
props.add(SNIFFER_INTERVAL);
props.add(SNIFFER_REQUEST_TIMEOUT);
props.add(SNIFF_ON_FAILURE);
props.add(SNIFFER_FAILURE_DELAY);
properties = Collections.unmodifiableList(props);
}
@ -113,9 +146,20 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return properties;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dynamic(true)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
final List<ValidationResult> results = new ArrayList<>(1);
final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(validationContext.getProperty(AUTHORIZATION_SCHEME).getValue());
@ -126,6 +170,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
final SSLContextService sslService = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (authorizationScheme == AuthorizationScheme.PKI && (sslService == null || !sslService.isKeyStoreConfigured())) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false)
.explanation(String.format("if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.",
@ -146,6 +191,13 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
addAuthorizationPropertiesValidationIssue(results, API_KEY, API_KEY_ID);
}
final boolean sniffClusterNodes = validationContext.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
final boolean sniffOnFailure = validationContext.getProperty(SNIFF_ON_FAILURE).asBoolean();
if (sniffOnFailure && !sniffClusterNodes) {
results.add(new ValidationResult.Builder().subject(SNIFF_ON_FAILURE.getName()).valid(false)
.explanation(String.format("'%s' cannot be enabled if '%s' is disabled", SNIFF_ON_FAILURE.getDisplayName(), SNIFF_CLUSTER_NODES.getDisplayName())).build());
}
return results;
}
@ -160,6 +212,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
this.client = setupClient(context);
this.sniffer = setupSniffer(context, this.client);
responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
// re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic
@ -178,6 +231,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
@OnDisabled
public void onDisabled() throws IOException {
if (this.sniffer != null) {
this.sniffer.close();
this.sniffer = null;
}
if (this.client != null) {
this.client.close();
this.client = null;
@ -194,6 +252,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.verificationStepName(VERIFICATION_STEP_CONNECTION);
final ConfigVerificationResult.Builder warningsResult = new ConfigVerificationResult.Builder()
.verificationStepName(VERIFICATION_STEP_WARNINGS);
final ConfigVerificationResult.Builder snifferResult = new ConfigVerificationResult.Builder()
.verificationStepName(VERIFICATION_STEP_SNIFFER);
// configure the Rest Client
try (final RestClient verifyClient = setupClient(context)) {
@ -201,7 +261,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
// try to fetch the Elasticsearch root endpoint (system summary)
verifyRootConnection(verifyClient, connectionResult, warningsResult);
}catch (final MalformedURLException mue) {
// try sniffing for cluster nodes
verifySniffer(context, verifyClient, snifferResult);
} catch (final MalformedURLException mue) {
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation("Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
} catch (final InitializationException ie) {
@ -219,21 +282,78 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.explanation("Elasticsearch Rest Client not configured");
warningsResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
.explanation("Elasticsearch Rest Client not configured");
snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
.explanation("Elasticsearch Rest Client not configured");
}
results.add(clientSetup);
results.add(connectionResult.build());
results.add(warningsResult.build());
results.add(snifferResult.build());
}
return results;
}
private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
try (final Sniffer verifySniffer = setupSniffer(context, verifyClient)) {
if (verifySniffer != null) {
final List<Node> originalNodes = verifyClient.getNodes();
// cannot access the NodesSniffer from the parent Sniffer, so set up a second instance here
final ElasticsearchNodesSniffer elasticsearchNodesSniffer = setupElasticsearchNodesSniffer(context, verifyClient);
final List<Node> nodes = elasticsearchNodesSniffer.sniff();
// attempt to connect to each Elasticsearch Node using the RestClient
final AtomicInteger successfulInstances = new AtomicInteger(0);
final AtomicInteger warningInstances = new AtomicInteger(0);
nodes.forEach(n -> {
try {
verifyClient.setNodes(Collections.singletonList(n));
final List<String> warnings = getElasticsearchRoot(verifyClient);
successfulInstances.getAndIncrement();
if (!warnings.isEmpty()) {
warningInstances.getAndIncrement();
}
} catch (final Exception ex) {
getLogger().warn("Elasticsearch Node {} connection failed", n.getHost().toURI(), ex);
}
});
// reset Nodes list on RestClient to pre-Sniffer state (match user's Verify settings)
verifyClient.setNodes(originalNodes);
if (successfulInstances.get() < nodes.size()) {
snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(
String.format("Sniffing for Elasticsearch cluster nodes found %d nodes but %d could not be contacted (%d with warnings during connection tests)",
nodes.size(), nodes.size() - successfulInstances.get(), warningInstances.get())
);
} else {
snifferResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(
String.format("Sniffing for Elasticsearch cluster nodes found %d nodes (%d with warnings during connection tests)",
nodes.size(), warningInstances.get())
);
}
} else {
snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Sniff on Connection not enabled");
}
} catch (final Exception ex) {
getLogger().warn("Unable to sniff for Elasticsearch cluster nodes", ex);
snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation("Sniffing for Elasticsearch cluster nodes failed");
}
}
private List<String> getElasticsearchRoot(final RestClient verifyClient) throws IOException {
final Response response = verifyClient.performRequest(new Request("GET", "/"));
final List<String> warnings = parseResponseWarningHeaders(response);
parseResponse(response);
return warnings;
}
private void verifyRootConnection(final RestClient verifyClient, final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
try {
final Response response = verifyClient.performRequest(new Request("GET", "/"));
final List<String> warnings = parseResponseWarningHeaders(response);
parseResponse(response);
final List<String> warnings = getElasticsearchRoot(verifyClient);
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
if (warnings.isEmpty()) {
@ -253,80 +373,121 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
private RestClient setupClient(final ConfigurationContext context) throws MalformedURLException, InitializationException {
final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
final Integer socketTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
final NodeSelector nodeSelector = NODE_SELECTOR_ANY.getValue().equals(context.getProperty(NODE_SELECTOR).getValue())
? NodeSelector.ANY
: NodeSelector.SKIP_DEDICATED_MASTERS;
final String pathPrefix = context.getProperty(PATH_PREFIX).getValue();
final boolean compress = context.getProperty(COMPRESSION).asBoolean();
final boolean sendMetaHeader = context.getProperty(SEND_META_HEADER).asBoolean();
final boolean strictDeprecation = context.getProperty(STRICT_DEPRECATION).asBoolean();
final boolean sniffOnFailure = context.getProperty(SNIFF_ON_FAILURE).asBoolean();
final RestClientBuilder builder = RestClient.builder(getHttpHosts(context));
addAuthAndProxy(context, builder)
.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(connectTimeout);
requestConfigBuilder.setSocketTimeout(socketTimeout);
return requestConfigBuilder;
})
.setCompressionEnabled(compress)
.setMetaHeaderEnabled(sendMetaHeader)
.setStrictDeprecationMode(strictDeprecation)
.setNodeSelector(nodeSelector);
if (sniffOnFailure && sniffer != null) {
final SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
sniffOnFailureListener.setSniffer(sniffer);
builder.setFailureListener(sniffOnFailureListener);
}
if (StringUtils.isNotBlank(pathPrefix)) {
builder.setPathPrefix(pathPrefix);
}
return builder.build();
}
private HttpHost[] getHttpHosts(final ConfigurationContext context) throws MalformedURLException {
final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
final List<String> hostsSplit = Arrays.stream(hosts.split(",\\s*")).map(String::trim).collect(Collectors.toList());
this.url = hostsSplit.get(0);
final List<HttpHost> hh = new ArrayList<>(hostsSplit.size());
for (final String host : hostsSplit) {
final URL u = new URL(host);
hh.add(new HttpHost(u.getHost(), u.getPort(), u.getProtocol()));
}
return hh.toArray(new HttpHost[0]);
}
private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, final RestClientBuilder builder) throws InitializationException {
final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
final String[] hostsSplit = hosts.split(",\\s*");
this.url = hostsSplit[0];
final SSLContextService sslService =
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
final String apiKey = context.getProperty(API_KEY).getValue();
final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
final Integer socketTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
final SSLContext sslContext = getSSLContext(context);
final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
final HttpHost[] hh = new HttpHost[hostsSplit.length];
for (int x = 0; x < hh.length; x++) {
final URL u = new URL(hostsSplit[x]);
hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol());
}
return builder.setHttpClientConfigCallback(httpClientBuilder -> {
if (sslContext != null) {
httpClientBuilder.setSSLContext(sslContext);
}
CredentialsProvider credentialsProvider = null;
if (AuthorizationScheme.BASIC == authorizationScheme && username != null && password != null) {
credentialsProvider = addBasicAuthCredentials(null, AuthScope.ANY, username, password);
}
final List<Header> defaultHeaders = getDefaultHeadersFromDynamicProperties(context);
if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, apiKey));
}
if (!defaultHeaders.isEmpty()) {
builder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
}
if (proxyConfigurationService != null) {
final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
httpClientBuilder.setProxy(proxy);
credentialsProvider = addBasicAuthCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
}
}
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
});
}
private SSLContext getSSLContext(final ConfigurationContext context) throws InitializationException {
final SSLContextService sslService =
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext;
try {
sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
return (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
? sslService.createContext() : null;
} catch (final Exception e) {
getLogger().error("Error building up SSL Context from the supplied configuration.", e);
throw new InitializationException(e);
}
final RestClientBuilder builder = RestClient.builder(hh)
.setHttpClientConfigCallback(httpClientBuilder -> {
if (sslContext != null) {
httpClientBuilder.setSSLContext(sslContext);
}
CredentialsProvider credentialsProvider = null;
if (AuthorizationScheme.BASIC == authorizationScheme && username != null && password != null) {
credentialsProvider = addCredentials(null, AuthScope.ANY, username, password);
}
if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
httpClientBuilder.setDefaultHeaders(Collections.singletonList(createApiKeyAuthorizationHeader(apiKeyId, apiKey)));
}
if (proxyConfigurationService != null) {
final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
httpClientBuilder.setProxy(proxy);
credentialsProvider = addCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
}
}
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
})
.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(connectTimeout);
requestConfigBuilder.setSocketTimeout(socketTimeout);
return requestConfigBuilder;
});
return builder.build();
}
private CredentialsProvider addCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {
private CredentialsProvider addBasicAuthCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope,
final String username, final String password) {
final CredentialsProvider cp = credentialsProvider != null ? credentialsProvider : new BasicCredentialsProvider();
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
@ -339,12 +500,48 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return cp;
}
private BasicHeader createApiKeyAuthorizationHeader(String apiKeyId, String apiKey) {
private List<Header> getDefaultHeadersFromDynamicProperties(final ConfigurationContext context) {
return context.getProperties().entrySet().stream()
// filter non-null dynamic properties
.filter(e -> e.getKey().isDynamic() && StringUtils.isNotBlank(e.getValue())
&& StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue())
)
// convert to Headers
.map(e -> new BasicHeader(e.getKey().getName(),
context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue()))
.collect(Collectors.toList());
}
private BasicHeader createApiKeyAuthorizationHeader(final String apiKeyId, final String apiKey) {
final String apiKeyCredentials = String.format("%s:%s", apiKeyId, apiKey);
final String apiKeyAuth = Base64.getEncoder().encodeToString((apiKeyCredentials).getBytes(StandardCharsets.UTF_8));
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
}
private Sniffer setupSniffer(final ConfigurationContext context, final RestClient restClient) {
final boolean sniffClusterNodes = context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
final int snifferIntervalMillis = context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final int snifferFailureDelayMillis = context.getProperty(SNIFFER_FAILURE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
if (sniffClusterNodes) {
return Sniffer.builder(restClient)
.setSniffIntervalMillis(snifferIntervalMillis)
.setSniffAfterFailureDelayMillis(snifferFailureDelayMillis)
.setNodesSniffer(setupElasticsearchNodesSniffer(context, restClient))
.build();
}
return null;
}
private ElasticsearchNodesSniffer setupElasticsearchNodesSniffer(final ConfigurationContext context, final RestClient restClient) {
final Long snifferRequestTimeoutMillis = context.getProperty(SNIFFER_REQUEST_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
final ElasticsearchNodesSniffer.Scheme scheme = this.url.toLowerCase(Locale.getDefault()).startsWith("https://")
? ElasticsearchNodesSniffer.Scheme.HTTPS : ElasticsearchNodesSniffer.Scheme.HTTP;
return new ElasticsearchNodesSniffer(restClient, snifferRequestTimeoutMillis, scheme);
}
private void appendIndex(final StringBuilder sb, final String index) {
if (StringUtils.isNotBlank(index) && !"/".equals(index)) {
if (!index.startsWith("/")) {
@ -798,10 +995,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
if (getLogger().isDebugEnabled()) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
entity.writeTo(out);
out.close();
StringBuilder builder = new StringBuilder(1000);
builder.append("Dumping Elasticsearch REST request...\n")
.append("HTTP Method: ")
@ -812,11 +1005,18 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.append("\n")
.append("Parameters: ")
.append(prettyPrintWriter.writeValueAsString(parameters))
.append("\n")
.append("Request body: ")
.append(new String(out.toByteArray()))
.append("\n");
if (entity != null) {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
entity.writeTo(out);
out.close();
builder.append("Request body: ")
.append(out)
.append("\n");
}
getLogger().debug(builder.toString());
}

View File

@ -0,0 +1,54 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ElasticSearchClientServiceImpl</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Sniffing</h2>
<p>
The Elasticsearch Sniffer can be used to locate Elasticsearch Nodes within a Cluster to which you are connecting.
This can be beneficial if your cluster dynamically changes over time, e.g. new Nodes are added to maintain performance during heavy load.
</p>
<p>
Sniffing can also be used to update the list of Hosts within the Cluster if a connection Failure is encountered during operation.
In order to "Sniff on Failure", you <b>must</b> also enable "Sniff Cluster Nodes".
</p>
<p>
Not all situations make sense to use Sniffing, for example if:
<ul>
<li>Elasticsearch is situated behind a load balancer, which dynamically routes connections from NiFi</li>
<li>Elasticsearch is on a different network to NiFi</li>
</ul>
</p>
<p>
There may also be need to set some of the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html">
Elasticsearch Networking Advanced Settings</a>, such as <code>network.publish_host</code> to ensure that
the HTTP Hosts found by the Sniffer are accessible by NiFi. For example, Elasticsearch may use a network internal
<code>publish_host</code> that is inaccessible to NiFi, but instead should use an address/IP that NiFi understands.
It may also be necessary to add this same address to Elasticsearch's <code>network.bind_host</code> list.
</p>
<p>
See <a href="https://www.elastic.co/blog/elasticsearch-sniffing-best-practices-what-when-why-how">
Elasticsearch sniffing best practices: What, when, why, how</a> for more details of the best practices.
</p>
</body>
</html>

View File

@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -31,17 +31,17 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class SearchResponseTest {
@Test
void test() {
List<Map<String, Object>> results = new ArrayList<>();
Map<String, Object> aggs = new HashMap<>();
String pitId = "pitId";
String scrollId = "scrollId";
String searchAfter = "searchAfter";
int num = 10;
int took = 100;
boolean timeout = false;
List<String> warnings = Arrays.asList("auth");
SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
String str = response.toString();
final List<Map<String, Object>> results = new ArrayList<>();
final Map<String, Object> aggs = new HashMap<>();
final String pitId = "pitId";
final String scrollId = "scrollId";
final String searchAfter = "searchAfter";
final int num = 10;
final int took = 100;
final boolean timeout = false;
final List<String> warnings = Collections.singletonList("auth");
final SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
final String str = response.toString();
assertEquals(results, response.getHits());
assertEquals(aggs, response.getAggregations());

View File

@ -28,10 +28,10 @@ import java.util.List;
import java.util.Map;
public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
private Map<String, Object> data;
private final Map<String, Object> data;
public TestElasticSearchClientService() {
data = new HashMap<>();
data = new HashMap<>(4, 1);
data.put("username", "john.smith");
data.put("password", "testing1234");
data.put("email", "john.smith@test.com");

View File

@ -26,7 +26,7 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -36,12 +36,12 @@ import static org.apache.nifi.schema.access.SchemaField.SCHEMA_NAME;
public class TestSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
@Override
public RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) {
List<RecordField> fields = Arrays.asList(new RecordField("msg", RecordFieldType.STRING.getDataType()));
List<RecordField> fields = Collections.singletonList(new RecordField("msg", RecordFieldType.STRING.getDataType()));
return new SimpleRecordSchema(fields);
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return new HashSet<>(Arrays.asList(SCHEMA_NAME));
return new HashSet<>(Collections.singletonList(SCHEMA_NAME));
}
}

View File

@ -38,6 +38,8 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class);
service = new ElasticSearchClientServiceImpl();
runner.addControllerService(CLIENT_SERVICE_NAME, service);
runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, CLIENT_SERVICE_NAME);
runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, elasticsearchHost);
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000");
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000");
@ -45,6 +47,15 @@ abstract class AbstractElasticsearch_IT extends AbstractElasticsearchITBase {
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.BASIC.getValue());
runner.setProperty(service, ElasticSearchClientService.USERNAME, "elastic");
runner.setProperty(service, ElasticSearchClientService.PASSWORD, ELASTIC_USER_PASSWORD);
runner.removeProperty(service, ElasticSearchClientService.API_KEY);
runner.removeProperty(service, ElasticSearchClientService.API_KEY_ID);
runner.setProperty(service, ElasticSearchClientService.COMPRESSION, "false");
runner.setProperty(service, ElasticSearchClientService.SEND_META_HEADER, "true");
runner.setProperty(service, ElasticSearchClientService.STRICT_DEPRECATION, "false");
runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
runner.removeProperty(service, ElasticSearchClientService.PATH_PREFIX);
runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_ANY.getValue());
runner.enableControllerService(service);

View File

@ -18,7 +18,12 @@
package org.apache.nifi.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.VerifiableControllerService;
@ -31,7 +36,6 @@ import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.elasticsearch.MapBuilder;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.elasticsearch.TestControllerServiceProcessor;
import org.apache.nifi.elasticsearch.UpdateOperationResponse;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.StandardRestrictedSSLContextService;
@ -39,11 +43,15 @@ import org.apache.nifi.ssl.StandardSSLContextService;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockControllerServiceLookup;
import org.apache.nifi.util.MockVariableRegistry;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@ -82,13 +90,38 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(3, results.size());
assertEquals(4, results.size());
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
assertVerifySnifferSkipped(results);
}
@Test
void testVerifySniffer() {
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "true");
runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "false");
runner.enableControllerService(service);
assertVerifySniffer();
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "true");
runner.enableControllerService(service);
assertVerifySniffer();
}
private void assertVerifySniffer() {
final List<ConfigVerificationResult> results = ((VerifiableControllerService) service).verify(
new MockConfigurationContext(service, getClientServiceProperties(), runner.getProcessContext().getControllerServiceLookup(), new MockVariableRegistry()),
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(4, results.size());
assertEquals(4, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
}
@Test
void testVerifySuccessWithApiKeyAuth() throws IOException {
final Pair<String, String> apiKey = createApiKeyForIndex("*");
final Pair<String, String> apiKey = createApiKeyForIndex();
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.AUTHORIZATION_SCHEME, AuthorizationScheme.API_KEY.getValue());
@ -103,8 +136,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(3, results.size());
assertEquals(4, results.size());
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
assertVerifySnifferSkipped(results);
}
@Test
@ -117,8 +151,8 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(3, results.size());
assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(4, results.size());
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CLIENT_SETUP)
&& Objects.equals(result.getExplanation(), "Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName())
@ -146,8 +180,8 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(3, results.size());
assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(4, results.size());
assertEquals(3, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CLIENT_SETUP)
&& Objects.equals(result.getExplanation(), "Incorrect/invalid " + ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE.getDisplayName())
@ -167,9 +201,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(3, results.size());
assertEquals(4, results.size());
assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
&& Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
@ -193,9 +227,9 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
runner.getLogger(),
Collections.emptyMap()
);
assertEquals(3, results.size());
assertEquals(4, results.size());
assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SUCCESSFUL).count(), results.toString());
assertEquals(1, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(2, results.stream().filter(result -> result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(), results.toString());
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_CONNECTION)
&& Objects.equals(result.getExplanation(), "Unable to retrieve system summary from Elasticsearch root endpoint")
@ -623,6 +657,71 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
assertFalse(service.exists("index-does-not-exist", null), "index exists");
}
@Test
void testCompression() {
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.COMPRESSION, "true");
runner.enableControllerService(service);
runner.assertValid(service);
assertTrue(service.exists(INDEX, null), "index does not exist");
}
@Test
void testNoMetaHeader() {
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.SEND_META_HEADER, "false");
runner.enableControllerService(service);
runner.assertValid(service);
assertTrue(service.exists(INDEX, null), "index does not exist");
}
@Test
void testStrictDeprecation() {
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.STRICT_DEPRECATION, "true");
runner.enableControllerService(service);
runner.assertValid(service);
assertTrue(service.exists(INDEX, null), "index does not exist");
}
@Test
void testNodeSelector() {
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.NODE_SELECTOR, ElasticSearchClientService.NODE_SELECTOR_SKIP_DEDICATED_MASTERS.getValue());
runner.enableControllerService(service);
runner.assertValid(service);
assertTrue(service.exists(INDEX, null), "index does not exist");
}
@Test
void testRestClientRequestHeaders() {
runner.disableControllerService(service);
runner.setProperty(service, "User-Agent", "NiFi Integration Tests");
runner.setProperty(service, "X-Extra_header", "Request should still work");
runner.enableControllerService(service);
runner.assertValid(service);
assertTrue(service.exists(INDEX, null), "index does not exist");
}
@Test
void testSniffer() {
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "false");
runner.setProperty(service, ElasticSearchClientService.SNIFF_ON_FAILURE, "true");
runner.assertNotValid(service);
runner.setProperty(service, ElasticSearchClientService.SNIFF_CLUSTER_NODES, "true");
runner.enableControllerService(service);
runner.assertValid(service);
assertTrue(service.exists(INDEX, null), "index does not exist");
}
@Test
void testNullSuppression() throws InterruptedException {
final Map<String, Object> doc = new HashMap<>();
@ -659,13 +758,12 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
}
private void suppressNulls(final boolean suppressNulls) {
runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service");
runner.disableControllerService(service);
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, suppressNulls
? ElasticSearchClientService.ALWAYS_SUPPRESS.getValue()
: ElasticSearchClientService.NEVER_SUPPRESS.getValue());
runner.enableControllerService(service);
runner.assertValid();
runner.assertValid(service);
}
@Test
@ -823,4 +921,38 @@ class ElasticSearchClientService_IT extends AbstractElasticsearch_IT {
Thread.sleep(1000);
}
private void assertVerifySnifferSkipped(final List<ConfigVerificationResult> results) {
assertEquals(1, results.stream().filter(
result -> Objects.equals(result.getVerificationStepName(), ElasticSearchClientServiceImpl.VERIFICATION_STEP_SNIFFER)
&& Objects.equals(result.getExplanation(), "Sniff on Connection not enabled")
&& result.getOutcome() == ConfigVerificationResult.Outcome.SKIPPED).count(),
results.toString()
);
}
protected Pair<String, String> createApiKeyForIndex() throws IOException {
final String body = prettyJson(new MapBuilder()
.of("name", "test-api-key")
.of("role_descriptors", new MapBuilder()
.of("test-role", new MapBuilder()
.of("cluster", Collections.singletonList("all"))
.of("index", Collections.singletonList(new MapBuilder()
.of("names", Collections.singletonList("*"))
.of("privileges", Collections.singletonList("all"))
.build()))
.build())
.build())
.build());
final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
final Request request = new Request("POST", endpoint);
final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
request.setEntity(jsonBody);
final Response response = testDataManagementClient.performRequest(request);
final InputStream inputStream = response.getEntity().getContent();
final byte[] result = IOUtils.toByteArray(inputStream);
inputStream.close();
final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), new TypeReference<Map<String, String>>() {});
return Pair.of(ret.get("id"), ret.get("api_key"));
}
}

View File

@ -18,18 +18,18 @@ package org.apache.nifi.elasticsearch.integration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.HostConfig;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.elasticsearch.MapBuilder;
import org.apache.nifi.util.TestRunner;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.BeforeAll;
@ -39,30 +39,38 @@ import org.testcontainers.utility.DockerImageName;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.5.0"));
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.6.1"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)
.withPassword(ELASTIC_USER_PASSWORD)
.withEnv("xpack.security.enabled", "true")
// enable API Keys for integration-tests (6.x & 7.x don't enable SSL and therefore API Keys by default, so use a trial license and explicitly enable API Keys)
.withEnv("xpack.license.self_generated.type", "trial")
.withEnv("xpack.security.authc.api_key.enabled", "true");
.withEnv("xpack.security.authc.api_key.enabled", "true")
// use a "special address" to ensure the publish_host is in the bind_host list, otherwise the Sniffer won't work
// https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html#network-interface-values
// TestContainers makes Elasticsearch available via localhost/127.0.0.1; Elasticsearch uses the IP Address in publish_host
.withEnv("network.bind_host", "_local_,_site_")
.withEnv("network.publish_host", "127.0.0.1")
// pin the Elasticsearch port (typically 9200 but not guaranteed), also bind that to 9200 on the host so the network.publish_host is accessible
.withEnv("http.port", String.valueOf(PORT))
.withExposedPorts(PORT)
.withCreateContainerCmdModifier(cmd -> cmd.withHostConfig(
new HostConfig().withPortBindings(new PortBinding(Ports.Binding.bindPort(PORT), new ExposedPort(PORT)))
));
protected static final String CLIENT_SERVICE_NAME = "Client Service";
protected static final String INDEX = "messages";
@ -88,7 +96,7 @@ public abstract class AbstractElasticsearchITBase {
protected static String type;
private static RestClient testDataManagementClient;
static RestClient testDataManagementClient;
protected static void stopTestcontainer() {
if (ENABLE_TEST_CONTAINERS) {
@ -98,7 +106,6 @@ public abstract class AbstractElasticsearchITBase {
@BeforeAll
static void beforeAll() throws IOException {
startTestcontainer();
type = getElasticMajorVersion() == 6 ? "_doc" : "";
System.out.printf("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nIMAGE: %s:%s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
@ -173,32 +180,6 @@ public abstract class AbstractElasticsearchITBase {
return MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
}
protected Pair<String, String> createApiKeyForIndex(String index) throws IOException {
final String body = prettyJson(new MapBuilder()
.of("name", "test-api-key")
.of("role_descriptors", new MapBuilder()
.of("test-role", new MapBuilder()
.of("cluster", Collections.singletonList("all"))
.of("index", Collections.singletonList(new MapBuilder()
.of("names", Collections.singletonList(index))
.of("privileges", Collections.singletonList("all"))
.build()))
.build())
.build())
.build());
final String endpoint = String.format("%s/%s", elasticsearchHost, "_security/api_key");
final Request request = new Request("POST", endpoint);
final HttpEntity jsonBody = new NStringEntity(body, ContentType.APPLICATION_JSON);
request.setEntity(jsonBody);
final Response response = testDataManagementClient.performRequest(request);
final InputStream inputStream = response.getEntity().getContent();
final byte[] result = IOUtils.toByteArray(inputStream);
inputStream.close();
final Map<String, String> ret = MAPPER.readValue(new String(result, StandardCharsets.UTF_8), Map.class);
return Pair.of(ret.get("id"), ret.get("api_key"));
}
private static List<SetupAction> readSetupActions(final String scriptPath) throws IOException {
final List<SetupAction> actions = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(Paths.get(scriptPath))))) {

View File

@ -31,6 +31,17 @@ language governing permissions and limitations under the License. -->
<module>nifi-elasticsearch-restapi-processors</module>
</modules>
<properties>
<!-- pinned at 7.13.4 as it is the last version prior to Elastic forcing the client to check it is connecting
to an Elastic-provided Elasticsearch instead of an instance provided by someone else (e.g. AWS OpenSearch)
see: https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0
(https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html) even after the move
of the main Elasticsearch product and elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
<elasticsearch.client.version>7.13.4</elasticsearch.client.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
@ -53,14 +64,7 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<!-- pinned at 7.13.4 as it is the last version prior to Elastic forcing the client to check it is connecting
to an Elastic-provided Elasticsearch instead of an instance provided by someone else (e.g. AWS OpenSearch)
see: https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0
(https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_license.html) even after the move
of the main Elasticsearch product and elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
<version>7.13.4</version>
<version>${elasticsearch.client.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
@ -73,6 +77,18 @@ language governing permissions and limitations under the License. -->
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>${elasticsearch.client.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
@ -85,7 +101,7 @@ language governing permissions and limitations under the License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
<elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
<elasticsearch_docker_image>8.6.1</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>
@ -116,7 +132,7 @@ language governing permissions and limitations under the License. -->
<profile>
<id>elasticsearch7</id>
<properties>
<elasticsearch_docker_image>7.17.8</elasticsearch_docker_image>
<elasticsearch_docker_image>7.17.9</elasticsearch_docker_image>
</properties>
</profile>
</profiles>