From 6542505a5071f7bd1152daea5b1763b2af618a74 Mon Sep 17 00:00:00 2001
From: Chris Sampson
Date: Fri, 11 Nov 2022 15:12:05 +0000
Subject: [PATCH] NIFI-10797 add customisable Elasticsearch REST Client config
and Elasticsearch Cluster Sniffer
Signed-off-by: Joe Gresock
This closes #6658.
---
.../ElasticSearchClientService.java | 116 +++++-
.../nifi-elasticsearch-client-service/pom.xml | 4 +
.../ElasticSearchClientServiceImpl.java | 336 ++++++++++++++----
.../additionalDetails.html | 54 +++
.../elasticsearch/SearchResponseTest.java | 24 +-
.../TestElasticSearchClientService.java | 4 +-
.../elasticsearch/TestSchemaRegistry.java | 6 +-
.../integration/AbstractElasticsearch_IT.java | 11 +
.../ElasticSearchClientService_IT.java | 160 ++++++++-
.../AbstractElasticsearchITBase.java | 57 +--
.../nifi-elasticsearch-bundle/pom.xml | 36 +-
11 files changed, 657 insertions(+), 151 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
index 4664ae1a64..7e80f345d9 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.elasticsearch;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
@@ -32,8 +30,6 @@ import org.apache.nifi.ssl.SSLContextService;
import java.util.List;
import java.util.Map;
-@Tags({"elasticsearch", "client"})
-@CapabilityDescription("A controller service for accessing an Elasticsearch client.")
public interface ElasticSearchClientService extends ControllerService, VerifiableControllerService {
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts")
@@ -148,6 +144,118 @@ public interface ElasticSearchClientService extends ControllerService, Verifiabl
.required(true)
.build();
+ PropertyDescriptor COMPRESSION = new PropertyDescriptor.Builder()
+ .name("el-cs-enable-compression")
+ .displayName("Enable Compression")
+ .description("Whether the REST client should compress requests using gzip content encoding and add the " +
+ "\"Accept-Encoding: gzip\" header to receive compressed responses")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ PropertyDescriptor SEND_META_HEADER = new PropertyDescriptor.Builder()
+ .name("el-cs-send-meta-header")
+ .displayName("Send Meta Header")
+ .description("Whether to send a \"X-Elastic-Client-Meta\" header that describes the runtime environment. " +
+ "It contains information that is similar to what could be found in User-Agent. " +
+ "Using a separate header allows applications to use User-Agent for their own needs, " +
+ "e.g. to identify application version or other environment information")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ PropertyDescriptor STRICT_DEPRECATION = new PropertyDescriptor.Builder()
+ .name("el-cs-strict-deprecation")
+ .displayName("Strict Deprecation")
+ .description("Whether the REST client should return any response containing at least one warning header as a failure")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ AllowableValue NODE_SELECTOR_ANY = new AllowableValue("ANY", "Any",
+ "Select any Elasticsearch node to handle requests");
+ AllowableValue NODE_SELECTOR_SKIP_DEDICATED_MASTERS = new AllowableValue("SKIP_DEDICATED_MASTERS", "Skip Dedicated Masters",
+ "Skip dedicated Elasticsearch master nodes for handling request");
+
+ PropertyDescriptor NODE_SELECTOR = new PropertyDescriptor.Builder()
+ .name("el-cs-node-selector")
+ .displayName("Node Selector")
+ .description("Selects Elasticsearch nodes that can receive requests. Used to keep requests away from dedicated Elasticsearch master nodes")
+ .allowableValues(NODE_SELECTOR_ANY, NODE_SELECTOR_SKIP_DEDICATED_MASTERS)
+ .defaultValue(NODE_SELECTOR_ANY.getValue())
+ .required(true)
+ .build();
+
+ PropertyDescriptor PATH_PREFIX = new PropertyDescriptor.Builder()
+ .name("el-cs-path-prefix")
+ .displayName("Path Prefix")
+ .description("Sets the path's prefix for every request used by the http client. " +
+ "For example, if this is set to \"/my/path\", then any client request will become \"/my/path/\" + endpoint. " +
+ "In essence, every request's endpoint is prefixed by this pathPrefix. " +
+ "The path prefix is useful for when Elasticsearch is behind a proxy that provides a base path or a proxy that requires all paths to start with '/'; " +
+ "it is not intended for other purposes and it should not be supplied in other scenarios")
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .build();
+
+ PropertyDescriptor SNIFF_CLUSTER_NODES = new PropertyDescriptor.Builder()
+ .name("el-cs-sniff-cluster-nodes")
+ .displayName("Sniff Cluster Nodes")
+ .description("Periodically sniff for nodes within the Elasticsearch cluster via the Elasticsearch Node Info API. " +
+ "If Elasticsearch security features are enabled (default to \"true\" for 8.x+), the Elasticsearch user must " +
+ "have the \"monitor\" or \"manage\" cluster privilege to use this API." +
+ "Note that all " + HTTP_HOSTS.getDisplayName() + " (and those that may be discovered within the cluster " +
+ "using the Sniffer) must use the same protocol, e.g. http or https, and be contactable using the same client settings. " +
+ "Finally the Elasticsearch \"network.publish_host\" must match one of the \"network.bind_host\" list entries " +
+ "see https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html for more information")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ PropertyDescriptor SNIFF_ON_FAILURE = new PropertyDescriptor.Builder()
+ .name("el-cs-sniff-failure")
+ .displayName("Sniff on Failure")
+ .description("Enable sniffing on failure, meaning that after each failure the Elasticsearch nodes list gets updated " +
+ "straightaway rather than at the following ordinary sniffing round")
+ .dependsOn(SNIFF_CLUSTER_NODES, "true")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ PropertyDescriptor SNIFFER_INTERVAL = new PropertyDescriptor.Builder()
+ .name("el-cs-sniffer-interval")
+ .displayName("Sniffer Interval")
+ .description("Interval between Cluster sniffer operations")
+ .dependsOn(SNIFF_CLUSTER_NODES, "true")
+ .defaultValue("5 mins")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(true)
+ .build();
+
+ PropertyDescriptor SNIFFER_REQUEST_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("el-cs-sniffer-request-timeout")
+ .displayName("Sniffer Request Timeout")
+ .description("Cluster sniffer timeout for node info requests")
+ .dependsOn(SNIFF_CLUSTER_NODES, "true")
+ .defaultValue("1 sec")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(true)
+ .build();
+
+ PropertyDescriptor SNIFFER_FAILURE_DELAY = new PropertyDescriptor.Builder()
+ .name("el-cs-sniffer-failure-delay")
+ .displayName("Sniffer Failure Delay")
+ .description("Delay between an Elasticsearch request failure and updating available Cluster nodes using the Sniffer")
+ .dependsOn(SNIFF_ON_FAILURE, "true")
+ .defaultValue("1 min")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .required(true)
+ .build();
+
/**
* Index a document.
*
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 9da490b8af..3cc3f98bef 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -132,6 +132,10 @@
org.elasticsearch.clientelasticsearch-rest-client
+
+ org.elasticsearch.client
+ elasticsearch-rest-client-sniffer
+
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 9658c2ce86..3fceedc8ab 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -32,6 +32,9 @@ import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ConfigVerificationResult;
@@ -40,19 +43,26 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
+import org.elasticsearch.client.Node;
+import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
+import org.elasticsearch.client.sniff.SniffOnFailureListener;
+import org.elasticsearch.client.sniff.Sniffer;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayOutputStream;
@@ -73,12 +83,23 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+@Tags({"elasticsearch", "elasticsearch6", "elasticsearch7", "elasticsearch8", "client"})
+@CapabilityDescription("A controller service for accessing an Elasticsearch client. " +
+ "Uses the Elasticsearch REST Client (7.13.4, the last version before client connections verify" +
+ "the server is Elastic provided, this should allow for connections to compatible alternatives, e.g. AWS OpenSearch)")
+@DynamicProperty(
+ name = "The name of a Request Header to add",
+ value = "The value of the Header",
+ expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
+ description = "Adds the specified property name/value as a Request Header in the Elasticsearch requests.")
public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
public static final String VERIFICATION_STEP_CONNECTION = "Elasticsearch Connection";
public static final String VERIFICATION_STEP_CLIENT_SETUP = "Elasticsearch Rest Client Setup";
public static final String VERIFICATION_STEP_WARNINGS = "Elasticsearch Warnings";
+ public static final String VERIFICATION_STEP_SNIFFER = "Elasticsearch Sniffer";
private ObjectMapper mapper;
@@ -86,6 +107,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private RestClient client;
+ private Sniffer sniffer;
+
private String url;
private Charset responseCharset;
private ObjectWriter prettyPrintWriter;
@@ -93,6 +116,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
static {
final List props = new ArrayList<>();
props.add(HTTP_HOSTS);
+ props.add(PATH_PREFIX);
props.add(AUTHORIZATION_SCHEME);
props.add(USERNAME);
props.add(PASSWORD);
@@ -104,6 +128,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
props.add(SOCKET_TIMEOUT);
props.add(CHARSET);
props.add(SUPPRESS_NULLS);
+ props.add(COMPRESSION);
+ props.add(SEND_META_HEADER);
+ props.add(STRICT_DEPRECATION);
+ props.add(NODE_SELECTOR);
+ props.add(SNIFF_CLUSTER_NODES);
+ props.add(SNIFFER_INTERVAL);
+ props.add(SNIFFER_REQUEST_TIMEOUT);
+ props.add(SNIFF_ON_FAILURE);
+ props.add(SNIFFER_FAILURE_DELAY);
properties = Collections.unmodifiableList(props);
}
@@ -113,9 +146,20 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return properties;
}
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .dynamic(true)
+ .build();
+ }
+
@Override
protected Collection customValidate(final ValidationContext validationContext) {
- final List results = new ArrayList<>(super.customValidate(validationContext));
+ final List results = new ArrayList<>(1);
final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(validationContext.getProperty(AUTHORIZATION_SCHEME).getValue());
@@ -126,6 +170,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final boolean apiKeySet = validationContext.getProperty(API_KEY).isSet();
final SSLContextService sslService = validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
if (authorizationScheme == AuthorizationScheme.PKI && (sslService == null || !sslService.isKeyStoreConfigured())) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName()).valid(false)
.explanation(String.format("if '%s' is '%s' then '%s' must be set and specify a Keystore for mutual TLS encryption.",
@@ -146,6 +191,13 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
addAuthorizationPropertiesValidationIssue(results, API_KEY, API_KEY_ID);
}
+ final boolean sniffClusterNodes = validationContext.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
+ final boolean sniffOnFailure = validationContext.getProperty(SNIFF_ON_FAILURE).asBoolean();
+ if (sniffOnFailure && !sniffClusterNodes) {
+ results.add(new ValidationResult.Builder().subject(SNIFF_ON_FAILURE.getName()).valid(false)
+ .explanation(String.format("'%s' cannot be enabled if '%s' is disabled", SNIFF_ON_FAILURE.getDisplayName(), SNIFF_CLUSTER_NODES.getDisplayName())).build());
+ }
+
return results;
}
@@ -160,6 +212,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
this.client = setupClient(context);
+ this.sniffer = setupSniffer(context, this.client);
responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
// re-create the ObjectMapper in case the SUPPRESS_NULLS property has changed - the JsonInclude settings aren't dynamic
@@ -178,6 +231,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
@OnDisabled
public void onDisabled() throws IOException {
+ if (this.sniffer != null) {
+ this.sniffer.close();
+ this.sniffer = null;
+ }
+
if (this.client != null) {
this.client.close();
this.client = null;
@@ -194,6 +252,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.verificationStepName(VERIFICATION_STEP_CONNECTION);
final ConfigVerificationResult.Builder warningsResult = new ConfigVerificationResult.Builder()
.verificationStepName(VERIFICATION_STEP_WARNINGS);
+ final ConfigVerificationResult.Builder snifferResult = new ConfigVerificationResult.Builder()
+ .verificationStepName(VERIFICATION_STEP_SNIFFER);
// configure the Rest Client
try (final RestClient verifyClient = setupClient(context)) {
@@ -201,7 +261,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
// try to fetch the Elasticsearch root endpoint (system summary)
verifyRootConnection(verifyClient, connectionResult, warningsResult);
- }catch (final MalformedURLException mue) {
+
+ // try sniffing for cluster nodes
+ verifySniffer(context, verifyClient, snifferResult);
+ } catch (final MalformedURLException mue) {
clientSetupResult.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation("Incorrect/invalid " + ElasticSearchClientService.HTTP_HOSTS.getDisplayName());
} catch (final InitializationException ie) {
@@ -219,21 +282,78 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.explanation("Elasticsearch Rest Client not configured");
warningsResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
.explanation("Elasticsearch Rest Client not configured");
+ snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED)
+ .explanation("Elasticsearch Rest Client not configured");
}
results.add(clientSetup);
results.add(connectionResult.build());
results.add(warningsResult.build());
+ results.add(snifferResult.build());
}
return results;
}
+ private void verifySniffer(final ConfigurationContext context, final RestClient verifyClient, final ConfigVerificationResult.Builder snifferResult) {
+ try (final Sniffer verifySniffer = setupSniffer(context, verifyClient)) {
+ if (verifySniffer != null) {
+ final List originalNodes = verifyClient.getNodes();
+ // cannot access the NodesSniffer from the parent Sniffer, so set up a second instance here
+ final ElasticsearchNodesSniffer elasticsearchNodesSniffer = setupElasticsearchNodesSniffer(context, verifyClient);
+ final List nodes = elasticsearchNodesSniffer.sniff();
+
+ // attempt to connect to each Elasticsearch Node using the RestClient
+ final AtomicInteger successfulInstances = new AtomicInteger(0);
+ final AtomicInteger warningInstances = new AtomicInteger(0);
+ nodes.forEach(n -> {
+ try {
+ verifyClient.setNodes(Collections.singletonList(n));
+ final List warnings = getElasticsearchRoot(verifyClient);
+ successfulInstances.getAndIncrement();
+ if (!warnings.isEmpty()) {
+ warningInstances.getAndIncrement();
+ }
+ } catch (final Exception ex) {
+ getLogger().warn("Elasticsearch Node {} connection failed", n.getHost().toURI(), ex);
+ }
+ });
+ // reset Nodes list on RestClient to pre-Sniffer state (match user's Verify settings)
+ verifyClient.setNodes(originalNodes);
+
+ if (successfulInstances.get() < nodes.size()) {
+ snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED).explanation(
+ String.format("Sniffing for Elasticsearch cluster nodes found %d nodes but %d could not be contacted (%d with warnings during connection tests)",
+ nodes.size(), nodes.size() - successfulInstances.get(), warningInstances.get())
+ );
+ } else {
+ snifferResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL).explanation(
+ String.format("Sniffing for Elasticsearch cluster nodes found %d nodes (%d with warnings during connection tests)",
+ nodes.size(), warningInstances.get())
+ );
+ }
+ } else {
+ snifferResult.outcome(ConfigVerificationResult.Outcome.SKIPPED).explanation("Sniff on Connection not enabled");
+ }
+ } catch (final Exception ex) {
+ getLogger().warn("Unable to sniff for Elasticsearch cluster nodes", ex);
+
+ snifferResult.outcome(ConfigVerificationResult.Outcome.FAILED)
+ .explanation("Sniffing for Elasticsearch cluster nodes failed");
+ }
+ }
+
+ private List getElasticsearchRoot(final RestClient verifyClient) throws IOException {
+ final Response response = verifyClient.performRequest(new Request("GET", "/"));
+ final List warnings = parseResponseWarningHeaders(response);
+ parseResponse(response);
+
+ return warnings;
+ }
+
private void verifyRootConnection(final RestClient verifyClient, final ConfigVerificationResult.Builder connectionResult, final ConfigVerificationResult.Builder warningsResult) {
try {
- final Response response = verifyClient.performRequest(new Request("GET", "/"));
- final List warnings = parseResponseWarningHeaders(response);
- parseResponse(response);
+ final List warnings = getElasticsearchRoot(verifyClient);
connectionResult.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
if (warnings.isEmpty()) {
@@ -253,80 +373,121 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
private RestClient setupClient(final ConfigurationContext context) throws MalformedURLException, InitializationException {
+ final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
+ final Integer socketTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
+
+ final NodeSelector nodeSelector = NODE_SELECTOR_ANY.getValue().equals(context.getProperty(NODE_SELECTOR).getValue())
+ ? NodeSelector.ANY
+ : NodeSelector.SKIP_DEDICATED_MASTERS;
+ final String pathPrefix = context.getProperty(PATH_PREFIX).getValue();
+
+ final boolean compress = context.getProperty(COMPRESSION).asBoolean();
+ final boolean sendMetaHeader = context.getProperty(SEND_META_HEADER).asBoolean();
+ final boolean strictDeprecation = context.getProperty(STRICT_DEPRECATION).asBoolean();
+ final boolean sniffOnFailure = context.getProperty(SNIFF_ON_FAILURE).asBoolean();
+
+ final RestClientBuilder builder = RestClient.builder(getHttpHosts(context));
+ addAuthAndProxy(context, builder)
+ .setRequestConfigCallback(requestConfigBuilder -> {
+ requestConfigBuilder.setConnectTimeout(connectTimeout);
+ requestConfigBuilder.setSocketTimeout(socketTimeout);
+ return requestConfigBuilder;
+ })
+ .setCompressionEnabled(compress)
+ .setMetaHeaderEnabled(sendMetaHeader)
+ .setStrictDeprecationMode(strictDeprecation)
+ .setNodeSelector(nodeSelector);
+
+ if (sniffOnFailure && sniffer != null) {
+ final SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
+ sniffOnFailureListener.setSniffer(sniffer);
+ builder.setFailureListener(sniffOnFailureListener);
+ }
+
+ if (StringUtils.isNotBlank(pathPrefix)) {
+ builder.setPathPrefix(pathPrefix);
+ }
+
+ return builder.build();
+ }
+
+ private HttpHost[] getHttpHosts(final ConfigurationContext context) throws MalformedURLException {
+ final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
+
+ final List hostsSplit = Arrays.stream(hosts.split(",\\s*")).map(String::trim).collect(Collectors.toList());
+ this.url = hostsSplit.get(0);
+ final List hh = new ArrayList<>(hostsSplit.size());
+ for (final String host : hostsSplit) {
+ final URL u = new URL(host);
+ hh.add(new HttpHost(u.getHost(), u.getPort(), u.getProtocol()));
+ }
+
+ return hh.toArray(new HttpHost[0]);
+ }
+
+ private RestClientBuilder addAuthAndProxy(final ConfigurationContext context, final RestClientBuilder builder) throws InitializationException {
final AuthorizationScheme authorizationScheme = AuthorizationScheme.valueOf(context.getProperty(AUTHORIZATION_SCHEME).getValue());
- final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
- final String[] hostsSplit = hosts.split(",\\s*");
- this.url = hostsSplit[0];
- final SSLContextService sslService =
- context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
final String apiKeyId = context.getProperty(API_KEY_ID).getValue();
final String apiKey = context.getProperty(API_KEY).getValue();
- final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
- final Integer socketTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
-
+ final SSLContext sslContext = getSSLContext(context);
final ProxyConfigurationService proxyConfigurationService = context.getProperty(PROXY_CONFIGURATION_SERVICE).asControllerService(ProxyConfigurationService.class);
- final HttpHost[] hh = new HttpHost[hostsSplit.length];
- for (int x = 0; x < hh.length; x++) {
- final URL u = new URL(hostsSplit[x]);
- hh[x] = new HttpHost(u.getHost(), u.getPort(), u.getProtocol());
- }
+ return builder.setHttpClientConfigCallback(httpClientBuilder -> {
+ if (sslContext != null) {
+ httpClientBuilder.setSSLContext(sslContext);
+ }
+
+ CredentialsProvider credentialsProvider = null;
+ if (AuthorizationScheme.BASIC == authorizationScheme && username != null && password != null) {
+ credentialsProvider = addBasicAuthCredentials(null, AuthScope.ANY, username, password);
+ }
+
+ final List defaultHeaders = getDefaultHeadersFromDynamicProperties(context);
+ if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
+ defaultHeaders.add(createApiKeyAuthorizationHeader(apiKeyId, apiKey));
+ }
+ if (!defaultHeaders.isEmpty()) {
+ builder.setDefaultHeaders(defaultHeaders.toArray(new Header[0]));
+ }
+
+ if (proxyConfigurationService != null) {
+ final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
+ if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
+ final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
+ httpClientBuilder.setProxy(proxy);
+
+ credentialsProvider = addBasicAuthCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
+ }
+ }
+
+ if (credentialsProvider != null) {
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }
+
+ return httpClientBuilder;
+ });
+ }
+
+ private SSLContext getSSLContext(final ConfigurationContext context) throws InitializationException {
+ final SSLContextService sslService =
+ context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- final SSLContext sslContext;
try {
- sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
+ return (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
? sslService.createContext() : null;
} catch (final Exception e) {
getLogger().error("Error building up SSL Context from the supplied configuration.", e);
throw new InitializationException(e);
}
-
- final RestClientBuilder builder = RestClient.builder(hh)
- .setHttpClientConfigCallback(httpClientBuilder -> {
- if (sslContext != null) {
- httpClientBuilder.setSSLContext(sslContext);
- }
-
- CredentialsProvider credentialsProvider = null;
- if (AuthorizationScheme.BASIC == authorizationScheme && username != null && password != null) {
- credentialsProvider = addCredentials(null, AuthScope.ANY, username, password);
- }
-
- if (AuthorizationScheme.API_KEY == authorizationScheme && apiKeyId != null && apiKey != null) {
- httpClientBuilder.setDefaultHeaders(Collections.singletonList(createApiKeyAuthorizationHeader(apiKeyId, apiKey)));
- }
-
- if (proxyConfigurationService != null) {
- final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
- if (Proxy.Type.HTTP == proxyConfiguration.getProxyType()) {
- final HttpHost proxy = new HttpHost(proxyConfiguration.getProxyServerHost(), proxyConfiguration.getProxyServerPort(), "http");
- httpClientBuilder.setProxy(proxy);
-
- credentialsProvider = addCredentials(credentialsProvider, new AuthScope(proxy), proxyConfiguration.getProxyUserName(), proxyConfiguration.getProxyUserPassword());
- }
- }
-
- if (credentialsProvider != null) {
- httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
-
- return httpClientBuilder;
- })
- .setRequestConfigCallback(requestConfigBuilder -> {
- requestConfigBuilder.setConnectTimeout(connectTimeout);
- requestConfigBuilder.setSocketTimeout(socketTimeout);
- return requestConfigBuilder;
- });
-
- return builder.build();
}
- private CredentialsProvider addCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope, final String username, final String password) {
+ private CredentialsProvider addBasicAuthCredentials(final CredentialsProvider credentialsProvider, final AuthScope authScope,
+ final String username, final String password) {
final CredentialsProvider cp = credentialsProvider != null ? credentialsProvider : new BasicCredentialsProvider();
if (StringUtils.isNotBlank(username) && StringUtils.isNotBlank(password)) {
@@ -339,12 +500,48 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return cp;
}
- private BasicHeader createApiKeyAuthorizationHeader(String apiKeyId, String apiKey) {
+ private List getDefaultHeadersFromDynamicProperties(final ConfigurationContext context) {
+ return context.getProperties().entrySet().stream()
+ // filter non-null dynamic properties
+ .filter(e -> e.getKey().isDynamic() && StringUtils.isNotBlank(e.getValue())
+ && StringUtils.isNotBlank(context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue())
+ )
+ // convert to Headers
+ .map(e -> new BasicHeader(e.getKey().getName(),
+ context.getProperty(e.getKey()).evaluateAttributeExpressions().getValue()))
+ .collect(Collectors.toList());
+ }
+
+ private BasicHeader createApiKeyAuthorizationHeader(final String apiKeyId, final String apiKey) {
final String apiKeyCredentials = String.format("%s:%s", apiKeyId, apiKey);
final String apiKeyAuth = Base64.getEncoder().encodeToString((apiKeyCredentials).getBytes(StandardCharsets.UTF_8));
return new BasicHeader("Authorization", "ApiKey " + apiKeyAuth);
}
+ private Sniffer setupSniffer(final ConfigurationContext context, final RestClient restClient) {
+ final boolean sniffClusterNodes = context.getProperty(SNIFF_CLUSTER_NODES).asBoolean();
+ final int snifferIntervalMillis = context.getProperty(SNIFFER_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+ final int snifferFailureDelayMillis = context.getProperty(SNIFFER_FAILURE_DELAY).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+
+ if (sniffClusterNodes) {
+ return Sniffer.builder(restClient)
+ .setSniffIntervalMillis(snifferIntervalMillis)
+ .setSniffAfterFailureDelayMillis(snifferFailureDelayMillis)
+ .setNodesSniffer(setupElasticsearchNodesSniffer(context, restClient))
+ .build();
+ }
+
+ return null;
+ }
+
+ private ElasticsearchNodesSniffer setupElasticsearchNodesSniffer(final ConfigurationContext context, final RestClient restClient) {
+ final Long snifferRequestTimeoutMillis = context.getProperty(SNIFFER_REQUEST_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ final ElasticsearchNodesSniffer.Scheme scheme = this.url.toLowerCase(Locale.getDefault()).startsWith("https://")
+ ? ElasticsearchNodesSniffer.Scheme.HTTPS : ElasticsearchNodesSniffer.Scheme.HTTP;
+
+ return new ElasticsearchNodesSniffer(restClient, snifferRequestTimeoutMillis, scheme);
+ }
+
private void appendIndex(final StringBuilder sb, final String index) {
if (StringUtils.isNotBlank(index) && !"/".equals(index)) {
if (!index.startsWith("/")) {
@@ -798,10 +995,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
if (getLogger().isDebugEnabled()) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- entity.writeTo(out);
- out.close();
-
StringBuilder builder = new StringBuilder(1000);
builder.append("Dumping Elasticsearch REST request...\n")
.append("HTTP Method: ")
@@ -812,11 +1005,18 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.append("\n")
.append("Parameters: ")
.append(prettyPrintWriter.writeValueAsString(parameters))
- .append("\n")
- .append("Request body: ")
- .append(new String(out.toByteArray()))
.append("\n");
+ if (entity != null) {
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ entity.writeTo(out);
+ out.close();
+
+ builder.append("Request body: ")
+ .append(out)
+ .append("\n");
+ }
+
getLogger().debug(builder.toString());
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
new file mode 100644
index 0000000000..f0934cafe6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/resources/docs/org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl/additionalDetails.html
@@ -0,0 +1,54 @@
+
+
+
+
+
+ ElasticSearchClientServiceImpl
+
+
+
+
+
+
Sniffing
+
+ The Elasticsearch Sniffer can be used to locate Elasticsearch Nodes within a Cluster to which you are connecting.
+ This can be beneficial if your cluster dynamically changes over time, e.g. new Nodes are added to maintain performance during heavy load.
+
+
+ Sniffing can also be used to update the list of Hosts within the Cluster if a connection Failure is encountered during operation.
+ In order to "Sniff on Failure", you must also enable "Sniff Cluster Nodes".
+
+
+ Not all situations make sense to use Sniffing, for example if:
+
+
Elasticsearch is situated behind a load balancer, which dynamically routes connections from NiFi
+
Elasticsearch is on a different network to NiFi
+
+
+
+ There may also be need to set some of the
+ Elasticsearch Networking Advanced Settings, such as network.publish_host to ensure that
+ the HTTP Hosts found by the Sniffer are accessible by NiFi. For example, Elasticsearch may use a network internal
+ publish_host that is inaccessible to NiFi, but instead should use an address/IP that NiFi understands.
+ It may also be necessary to add this same address to Elasticsearch's network.bind_host list.
+