diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml index 4cf40fe642..68f35b9005 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/pom.xml @@ -55,6 +55,11 @@ language governing permissions and limitations under the License. --> elasticsearch ${es.version} + + com.squareup.okhttp3 + okhttp + 3.3.1 + org.apache.nifi nifi-ssl-context-service-api @@ -63,11 +68,34 @@ language governing permissions and limitations under the License. --> commons-io commons-io + + org.codehaus.jackson + jackson-mapper-asl + org.apache.nifi nifi-ssl-context-service test + + com.fasterxml.jackson.core + jackson-core + 2.5.4 + test + + + + + org.apache.rat + apache-rat-plugin + + + src/test/resources/DocumentExample.json + + + + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java new file mode 100644 index 0000000000..d477d0eb9e --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.Credentials; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.Proxy; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A base class for Elasticsearch processors that use the HTTP API + */ +public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor { + + public static final PropertyDescriptor ES_URL = new PropertyDescriptor.Builder() + .name("elasticsearch-http-url") + .displayName("Elasticsearch URL") + .description("Elasticsearch URL which will be connected to, including scheme, host, port, path. The default port for the REST API is 9200.") + .required(true) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder() + .name("elasticsearch-http-proxy-host") + .displayName("Proxy Host") + .description("The fully qualified hostname or IP address of the proxy server") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder() + .name("elasticsearch-http-proxy-port") + .displayName("Proxy Port") + .description("The port of the proxy server") + .required(false) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); + + public static final PropertyDescriptor CONNECT_TIMEOUT = new PropertyDescriptor.Builder() + .name("elasticsearch-http-connect-timeout") + .displayName("Connection Timeout") + .description("Max wait time for the connection to the Elasticsearch REST API.") + .required(true) + .defaultValue("5 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor RESPONSE_TIMEOUT = new PropertyDescriptor.Builder() + .name("elasticsearch-http-response-timeout") + .displayName("Response Timeout") + .description("Max wait time for a response from the Elasticsearch REST API.") + .required(true) + .defaultValue("15 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + okHttpClientAtomicReference.set(null); + + OkHttpClient.Builder okHttpClient = new OkHttpClient.Builder(); + + // Add a proxy if set + final String proxyHost = context.getProperty(PROXY_HOST).getValue(); + final Integer proxyPort = context.getProperty(PROXY_PORT).asInteger(); + if (proxyHost != null && proxyPort != null) { + final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(proxyHost, proxyPort)); + okHttpClient.proxy(proxy); + } + + // Set timeouts + okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); + okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); + + final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(SSLContextService.ClientAuth.NONE); + + // check if the ssl context is set and add the factory if so + if (sslContext != null) { + okHttpClient.sslSocketFactory(sslContext.getSocketFactory()); + } + + okHttpClientAtomicReference.set(okHttpClient.build()); + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + List results = new ArrayList<>(super.customValidate(validationContext)); + if(validationContext.getProperty(PROXY_HOST).isSet() != validationContext.getProperty(PROXY_PORT).isSet()) { + results.add(new ValidationResult.Builder() + .valid(false) + .explanation("Proxy Host and Proxy Port must be both set or empty") + .build()); + } + return results; + } + + protected OkHttpClient getClient() { + return okHttpClientAtomicReference.get(); + } + + protected boolean isSuccess(int statusCode) { + return statusCode / 100 == 2; + } + + protected Response sendRequestToElasticsearch(OkHttpClient client, URL url, String username, String password, String verb, RequestBody body) throws IOException { + + final ComponentLog log = getLogger(); + Request.Builder requestBuilder = new Request.Builder() + .url(url); + if ("get".equalsIgnoreCase(verb)) { + requestBuilder = requestBuilder.get(); + } else if ("put".equalsIgnoreCase(verb)) { + requestBuilder = requestBuilder.put(body); + } else { + throw new IllegalArgumentException("Elasticsearch REST API verb not supported by this processor: " + verb); + } + + if(!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) { + String credential = Credentials.basic(username, password); + requestBuilder = requestBuilder.header("Authorization", credential); + } + Request httpRequest = requestBuilder.build(); + log.debug("Sending Elasticsearch request to {}", new Object[]{url}); + + Response responseHttp = client.newCall(httpRequest).execute(); + + // store the status code and message + int statusCode = responseHttp.code(); + + if (statusCode == 0) { + throw new IllegalStateException("Status code unknown, connection hasn't been attempted."); + } + + log.debug("Received response from Elasticsearch with status code {}", new Object[]{statusCode}); + + return responseHttp; + } + + protected JsonNode parseJsonResponse(InputStream in) throws IOException { + final ObjectMapper mapper = new ObjectMapper(); + return mapper.readTree(in); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java index f810ec3abd..76c7224412 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java @@ -19,92 +19,37 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.StringUtils; -import org.elasticsearch.client.Client; -import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import java.io.File; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - +/** + * A base class for all Elasticsearch processors + */ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { - /** - * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries - */ - private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final List esList = Arrays.asList(input.split(",")); - for (String hostnamePort : esList) { - String[] addresses = hostnamePort.split(":"); - // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) - if (addresses.length != 2) { - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Must be in hostname:port form (no scheme such as http://").valid(false).build(); - } - } - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Valid cluster definition").valid(true).build(); - } - }; - - protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() - .name("Cluster Name") - .description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("elasticsearch") - .build(); - - protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() - .name("ElasticSearch Hosts") - .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " - + "host1:port,host2:port,.... For example testcluster:9300.") - .required(true) - .expressionLanguageSupported(false) - .addValidator(HOSTNAME_PORT_VALIDATOR) - .build(); - public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("The SSL Context Service used to provide client certificate information for TLS/SSL " - + "connections. This service only applies if the Shield plugin is available.") + + "connections. This service only applies if the Elasticsearch endpoint(s) have been secured with TLS/SSL.") .required(false) .identifiesControllerService(SSLContextService.class) .build(); - public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder() - .name("Shield Plugin Filename") - .description("Specifies the path to the JAR for the Elasticsearch Shield plugin. " - + "If the Elasticsearch cluster has been secured with the Shield plugin, then the Shield plugin " - + "JAR must also be available to this processor. Note: Do NOT place the Shield JAR into NiFi's " - + "lib/ directory, doing so will prevent the Shield plugin from being loaded.") - .required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("Specifies the character set of the document data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() @@ -122,36 +67,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); - protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() - .name("ElasticSearch Ping Timeout") - .description("The ping timeout used to determine when a node is unreachable. " + - "For example, 5s (5 seconds). If non-local recommended is 30s") - .required(true) - .defaultValue("5s") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() - .name("Sampler Interval") - .description("How often to sample / ping the nodes listed and connected. For example, 5s (5 seconds). " - + "If non-local recommended is 30s.") - .required(true) - .defaultValue("5s") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - protected static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("Character Set") - .description("Specifies the character set of the document data.") - .required(true) - .defaultValue("UTF-8") - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .build(); - - protected AtomicReference esClient = new AtomicReference<>(); - protected List esHosts; - protected String authToken; - + protected abstract void createElasticsearchClient(ProcessContext context) throws ProcessException; @Override protected Collection customValidate(ValidationContext validationContext) { @@ -172,163 +88,4 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { createElasticsearchClient(context); } - /** - * Instantiate ElasticSearch Client. This chould be called by subclasses' @OnScheduled method to create a client - * if one does not yet exist. If called when scheduled, closeClient() should be called by the subclasses' @OnStopped - * method so the client will be destroyed when the processor is stopped. - * - * @param context The context for this processor - * @throws ProcessException if an error occurs while creating an Elasticsearch client - */ - protected void createElasticsearchClient(ProcessContext context) throws ProcessException { - - ComponentLog log = getLogger(); - if (esClient.get() != null) { - return; - } - - log.debug("Creating ElasticSearch Client"); - try { - final String clusterName = context.getProperty(CLUSTER_NAME).getValue(); - final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue(); - final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue(); - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); - - final SSLContextService sslService = - context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - - Settings.Builder settingsBuilder = Settings.settingsBuilder() - .put("cluster.name", clusterName) - .put("client.transport.ping_timeout", pingTimeout) - .put("client.transport.nodes_sampler_interval", samplerInterval); - - String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue(); - if (sslService != null) { - settingsBuilder.put("shield.transport.ssl", "true") - .put("shield.ssl.keystore.path", sslService.getKeyStoreFile()) - .put("shield.ssl.keystore.password", sslService.getKeyStorePassword()) - .put("shield.ssl.truststore.path", sslService.getTrustStoreFile()) - .put("shield.ssl.truststore.password", sslService.getTrustStorePassword()); - } - - // Set username and password for Shield - if (!StringUtils.isEmpty(username)) { - StringBuffer shieldUser = new StringBuffer(username); - if (!StringUtils.isEmpty(password)) { - shieldUser.append(":"); - shieldUser.append(password); - } - settingsBuilder.put("shield.user", shieldUser); - - } - - TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password); - - final String hosts = context.getProperty(HOSTS).getValue(); - esHosts = getEsHosts(hosts); - - if (esHosts != null) { - for (final InetSocketAddress host : esHosts) { - try { - transportClient.addTransportAddress(new InetSocketTransportAddress(host)); - } catch (IllegalArgumentException iae) { - log.error("Could not add transport address {}", new Object[]{host}); - } - } - } - esClient.set(transportClient); - - } catch (Exception e) { - log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e); - throw new ProcessException(e); - } - } - - protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl, - String username, String password) - throws MalformedURLException { - - // Create new transport client using the Builder pattern - TransportClient.Builder builder = TransportClient.builder(); - - // See if the Elasticsearch Shield JAR location was specified, and add the plugin if so. Also create the - // authorization token if username and password are supplied. - final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); - if (!StringUtils.isBlank(shieldUrl)) { - ClassLoader shieldClassLoader = - new URLClassLoader(new URL[]{new File(shieldUrl).toURI().toURL()}, this.getClass().getClassLoader()); - Thread.currentThread().setContextClassLoader(shieldClassLoader); - - try { - Class shieldPluginClass = Class.forName("org.elasticsearch.shield.ShieldPlugin", true, shieldClassLoader); - builder = builder.addPlugin(shieldPluginClass); - - if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) { - - // Need a couple of classes from the Shield plugin to build the token - Class usernamePasswordTokenClass = - Class.forName("org.elasticsearch.shield.authc.support.UsernamePasswordToken", true, shieldClassLoader); - - Class securedStringClass = - Class.forName("org.elasticsearch.shield.authc.support.SecuredString", true, shieldClassLoader); - - Constructor securedStringCtor = securedStringClass.getConstructor(char[].class); - Object securePasswordString = securedStringCtor.newInstance(password.toCharArray()); - - Method basicAuthHeaderValue = usernamePasswordTokenClass.getMethod("basicAuthHeaderValue", String.class, securedStringClass); - authToken = (String) basicAuthHeaderValue.invoke(null, username, securePasswordString); - } - } catch (ClassNotFoundException - | NoSuchMethodException - | InstantiationException - | IllegalAccessException - | InvocationTargetException shieldLoadException) { - getLogger().debug("Did not detect Elasticsearch Shield plugin, secure connections and/or authorization will not be available"); - } - } else { - getLogger().debug("No Shield plugin location specified, secure connections and/or authorization will not be available"); - } - TransportClient transportClient = builder.settings(settingsBuilder.build()).build(); - Thread.currentThread().setContextClassLoader(originalClassLoader); - return transportClient; - } - - /** - * Dispose of ElasticSearch client - */ - public void closeClient() { - if (esClient.get() != null) { - getLogger().info("Closing ElasticSearch Client"); - esClient.get().close(); - esClient.set(null); - } - } - - /** - * Get the ElasticSearch hosts from a Nifi attribute, e.g. - * - * @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.) - * @return List of InetSocketAddresses for the ES hosts - */ - private List getEsHosts(String hosts) { - - if (hosts == null) { - return null; - } - final List esList = Arrays.asList(hosts.split(",")); - List esHosts = new ArrayList<>(); - - for (String item : esList) { - - String[] addresses = item.split(":"); - final String hostName = addresses[0].trim(); - final int port = Integer.parseInt(addresses[1].trim()); - - esHosts.add(new InetSocketAddress(hostName, port)); - } - return esHosts; - } - - } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java new file mode 100644 index 0000000000..d2989db63a --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.StringUtils; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; + +import java.io.File; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + + +public abstract class AbstractElasticsearchTransportClientProcessor extends AbstractElasticsearchProcessor { + + /** + * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries + */ + private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final List esList = Arrays.asList(input.split(",")); + for (String hostnamePort : esList) { + String[] addresses = hostnamePort.split(":"); + // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) + if (addresses.length != 2) { + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Must be in hostname:port form (no scheme such as http://").valid(false).build(); + } + } + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Valid cluster definition").valid(true).build(); + } + }; + + protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() + .name("Cluster Name") + .description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("elasticsearch") + .build(); + + protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() + .name("ElasticSearch Hosts") + .description("ElasticSearch Hosts, which should be comma separated and colon for hostname/port " + + "host1:port,host2:port,.... For example testcluster:9300. This processor uses the Transport Client to " + + "connect to hosts. The default transport client port is 9300.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(HOSTNAME_PORT_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder() + .name("Shield Plugin Filename") + .description("Specifies the path to the JAR for the Elasticsearch Shield plugin. " + + "If the Elasticsearch cluster has been secured with the Shield plugin, then the Shield plugin " + + "JAR must also be available to this processor. Note: Do NOT place the Shield JAR into NiFi's " + + "lib/ directory, doing so will prevent the Shield plugin from being loaded.") + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + + protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() + .name("ElasticSearch Ping Timeout") + .description("The ping timeout used to determine when a node is unreachable. " + + "For example, 5s (5 seconds). If non-local recommended is 30s") + .required(true) + .defaultValue("5s") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() + .name("Sampler Interval") + .description("How often to sample / ping the nodes listed and connected. For example, 5s (5 seconds). " + + "If non-local recommended is 30s.") + .required(true) + .defaultValue("5s") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + protected AtomicReference esClient = new AtomicReference<>(); + protected List esHosts; + protected String authToken; + + /** + * Instantiate ElasticSearch Client. This chould be called by subclasses' @OnScheduled method to create a client + * if one does not yet exist. If called when scheduled, closeClient() should be called by the subclasses' @OnStopped + * method so the client will be destroyed when the processor is stopped. + * + * @param context The context for this processor + * @throws ProcessException if an error occurs while creating an Elasticsearch client + */ + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + + ComponentLog log = getLogger(); + if (esClient.get() != null) { + return; + } + + log.debug("Creating ElasticSearch Client"); + try { + final String clusterName = context.getProperty(CLUSTER_NAME).getValue(); + final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue(); + final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue(); + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + final SSLContextService sslService = + context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + Settings.Builder settingsBuilder = Settings.settingsBuilder() + .put("cluster.name", clusterName) + .put("client.transport.ping_timeout", pingTimeout) + .put("client.transport.nodes_sampler_interval", samplerInterval); + + String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue(); + if (sslService != null) { + settingsBuilder.put("shield.transport.ssl", "true") + .put("shield.ssl.keystore.path", sslService.getKeyStoreFile()) + .put("shield.ssl.keystore.password", sslService.getKeyStorePassword()) + .put("shield.ssl.truststore.path", sslService.getTrustStoreFile()) + .put("shield.ssl.truststore.password", sslService.getTrustStorePassword()); + } + + // Set username and password for Shield + if (!StringUtils.isEmpty(username)) { + StringBuffer shieldUser = new StringBuffer(username); + if (!StringUtils.isEmpty(password)) { + shieldUser.append(":"); + shieldUser.append(password); + } + settingsBuilder.put("shield.user", shieldUser); + + } + + TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password); + + final String hosts = context.getProperty(HOSTS).getValue(); + esHosts = getEsHosts(hosts); + + if (esHosts != null) { + for (final InetSocketAddress host : esHosts) { + try { + transportClient.addTransportAddress(new InetSocketTransportAddress(host)); + } catch (IllegalArgumentException iae) { + log.error("Could not add transport address {}", new Object[]{host}); + } + } + } + esClient.set(transportClient); + + } catch (Exception e) { + log.error("Failed to create Elasticsearch client due to {}", new Object[]{e}, e); + throw new ProcessException(e); + } + } + + protected TransportClient getTransportClient(Settings.Builder settingsBuilder, String shieldUrl, + String username, String password) + throws MalformedURLException { + + // Create new transport client using the Builder pattern + TransportClient.Builder builder = TransportClient.builder(); + + // See if the Elasticsearch Shield JAR location was specified, and add the plugin if so. Also create the + // authorization token if username and password are supplied. + final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); + if (!StringUtils.isBlank(shieldUrl)) { + ClassLoader shieldClassLoader = + new URLClassLoader(new URL[]{new File(shieldUrl).toURI().toURL()}, this.getClass().getClassLoader()); + Thread.currentThread().setContextClassLoader(shieldClassLoader); + + try { + Class shieldPluginClass = Class.forName("org.elasticsearch.shield.ShieldPlugin", true, shieldClassLoader); + builder = builder.addPlugin(shieldPluginClass); + + if (!StringUtils.isEmpty(username) && !StringUtils.isEmpty(password)) { + + // Need a couple of classes from the Shield plugin to build the token + Class usernamePasswordTokenClass = + Class.forName("org.elasticsearch.shield.authc.support.UsernamePasswordToken", true, shieldClassLoader); + + Class securedStringClass = + Class.forName("org.elasticsearch.shield.authc.support.SecuredString", true, shieldClassLoader); + + Constructor securedStringCtor = securedStringClass.getConstructor(char[].class); + Object securePasswordString = securedStringCtor.newInstance(password.toCharArray()); + + Method basicAuthHeaderValue = usernamePasswordTokenClass.getMethod("basicAuthHeaderValue", String.class, securedStringClass); + authToken = (String) basicAuthHeaderValue.invoke(null, username, securePasswordString); + } + } catch (ClassNotFoundException + | NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException shieldLoadException) { + getLogger().debug("Did not detect Elasticsearch Shield plugin, secure connections and/or authorization will not be available"); + } + } else { + getLogger().debug("No Shield plugin location specified, secure connections and/or authorization will not be available"); + } + TransportClient transportClient = builder.settings(settingsBuilder.build()).build(); + Thread.currentThread().setContextClassLoader(originalClassLoader); + return transportClient; + } + + /** + * Dispose of ElasticSearch client + */ + public void closeClient() { + if (esClient.get() != null) { + getLogger().info("Closing ElasticSearch Client"); + esClient.get().close(); + esClient.set(null); + } + } + + /** + * Get the ElasticSearch hosts from a Nifi attribute, e.g. + * + * @param hosts A comma-separated list of ElasticSearch hosts (host:port,host2:port2, etc.) + * @return List of InetSocketAddresses for the ES hosts + */ + private List getEsHosts(String hosts) { + + if (hosts == null) { + return null; + } + final List esList = Arrays.asList(hosts.split(",")); + List esHosts = new ArrayList<>(); + + for (String item : esList) { + + String[] addresses = item.split(":"); + final String hostName = addresses[0].trim(); + final int port = Integer.parseInt(addresses[1].trim()); + + esHosts.add(new InetSocketAddress(hostName, port)); + } + return esHosts; + } + + +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java index 8d2e3781dd..67aaae7877 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java @@ -64,7 +64,7 @@ import java.util.Set; @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") }) -public class FetchElasticsearch extends AbstractElasticsearchProcessor { +public class FetchElasticsearch extends AbstractElasticsearchTransportClientProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("All FlowFiles that are read from Elasticsearch are routed to this relationship").build(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java new file mode 100644 index 0000000000..9ce1510e2f --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.codehaus.jackson.JsonNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "fetch", "read", "get", "http"}) +@CapabilityDescription("Retrieves a document from Elasticsearch using the specified connection properties and the " + + "identifier of the document to retrieve. Note that the full body of the document will be read into memory before being " + + "written to a Flow File for transfer.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), + @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), + @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") +}) +public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + + private static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are read from Elasticsearch are routed to this relationship.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming " + + "flow files will be routed to failure.") + .build(); + + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") + .description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may " + + "succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship " + + "based on the processor properties and the results of the fetch operation.") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") + .description("A FlowFile is routed to this relationship if the specified document does not exist in the Elasticsearch cluster. " + + "Note that if the processor has no incoming connections, flow files may still be sent to this relationship based " + + "on the processor properties and the results of the fetch operation.") + .build(); + + public static final PropertyDescriptor DOC_ID = new PropertyDescriptor.Builder() + .name("fetch-es-doc-id") + .displayName("Document Identifier") + .description("The identifier of the document to be fetched") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("fetch-es-index") + .displayName("Index") + .description("The name of the index to read from") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("fetch-es-type") + .displayName("Type") + .description("The (optional) type of this document, used by Elasticsearch for indexing and searching. If the property is empty or set " + + "to _all, the first document matching the identifier across all types will be retrieved.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() + .name("fetch-es-fields") + .displayName("Fields") + .description("A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, " + + "then the entire document's source will be retrieved.") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_RETRY); + relationships.add(REL_NOT_FOUND); + return Collections.unmodifiableSet(relationships); + } + + @Override + public final List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(ES_URL); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(CONNECT_TIMEOUT); + descriptors.add(RESPONSE_TIMEOUT); + descriptors.add(DOC_ID); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(FIELDS); + + return Collections.unmodifiableList(descriptors); + } + + + @OnScheduled + public void setup(ProcessContext context) { + super.setup(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + FlowFile flowFile = null; + if (context.hasIncomingConnection()) { + flowFile = session.get(); + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (flowFile == null && context.hasNonLoopConnection()) { + return; + } + } + + OkHttpClient okHttpClient = getClient(); + + if (flowFile == null) { + flowFile = session.create(); + } + + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); + final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue(); + final String fields = context.getProperty(FIELDS).isSet() + ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue() + : null; + + // Authentication + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + final ComponentLog logger = getLogger(); + + + try { + logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId}); + + // read the url property from the context + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue()); + final URL url = buildRequestURL(urlstr, docId, index, docType, fields); + final long startNanos = System.nanoTime(); + + final Response getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null); + final int statusCode = getResponse.code(); + + if (isSuccess(statusCode)) { + ResponseBody body = getResponse.body(); + final byte[] bodyBytes = body.bytes(); + JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); + boolean found = responseJson.get("found").asBoolean(false); + String retrievedIndex = responseJson.get("_index").asText(); + String retrievedType = responseJson.get("_type").asText(); + String retrievedId = responseJson.get("_id").asText(); + + if (found) { + JsonNode source = responseJson.get("_source"); + flowFile = session.putAttribute(flowFile, "filename", retrievedId); + flowFile = session.putAttribute(flowFile, "es.index", retrievedIndex); + flowFile = session.putAttribute(flowFile, "es.type", retrievedType); + if (source != null) { + flowFile = session.write(flowFile, out -> { + out.write(source.toString().getBytes()); + }); + } + logger.debug("Elasticsearch document " + retrievedId + " fetched, routing to success"); + + // emit provenance event + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + if (context.hasNonLoopConnection()) { + session.getProvenanceReporter().fetch(flowFile, url.toExternalForm(), millis); + } else { + session.getProvenanceReporter().receive(flowFile, url.toExternalForm(), millis); + } + session.transfer(flowFile, REL_SUCCESS); + } else { + logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found", + new Object[]{index, docType, docId}); + + // We couldn't find the document, so send it to "not found" + session.transfer(flowFile, REL_NOT_FOUND); + } + } else { + if (statusCode == 404) { + logger.warn("Failed to read {}/{}/{} from Elasticsearch: Document not found", + new Object[]{index, docType, docId}); + + // We couldn't find the document, so penalize it and send it to "not found" + session.transfer(flowFile, REL_NOT_FOUND); + } else { + // 5xx -> RETRY, but a server error might last a while, so yield + if (statusCode / 100 == 5) { + + logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to retry. This is likely a server problem, yielding...", + new Object[]{statusCode, getResponse.message()}); + session.transfer(flowFile, REL_RETRY); + context.yield(); + } else if (context.hasIncomingConnection()) { // 1xx, 3xx, 4xx -> NO RETRY + logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()}); + session.transfer(flowFile, REL_FAILURE); + } else { + logger.warn("Elasticsearch returned code {} with message {}", new Object[]{statusCode, getResponse.message()}); + session.remove(flowFile); + } + } + } + } catch (IOException ioe) { + logger.error("Failed to read from Elasticsearch due to {}, this may indicate an error in configuration " + + "(hosts, username/password, etc.). Routing to retry", + new Object[]{ioe.getLocalizedMessage()}, ioe); + if (context.hasIncomingConnection()) { + session.transfer(flowFile, REL_RETRY); + } else { + session.remove(flowFile); + } + context.yield(); + + } catch (Exception e) { + logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e.getLocalizedMessage()}, e); + if (context.hasIncomingConnection()) { + session.transfer(flowFile, REL_FAILURE); + } else { + session.remove(flowFile); + } + context.yield(); + } + } + + private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields) throws MalformedURLException { + if (StringUtils.isEmpty(baseUrl)) { + throw new MalformedURLException("Base URL cannot be null"); + } + HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder(); + builder.addPathSegment(index); + builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type); + builder.addPathSegment(docId); + if (!StringUtils.isEmpty(fields)) { + String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(",")); + builder.addQueryParameter(FIELD_INCLUDE_QUERY_PARAM, trimmedFields); + } + + return builder.build().url(); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 507558608e..f64180b20c 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -62,7 +62,7 @@ import java.util.Set; + "the index to insert into and the type of the document. If the cluster has been configured for authorization " + "and/or secure transport (SSL/TLS) and the Shield plugin is available, secure connections can be made. This processor " + "supports Elasticsearch 2.x clusters.") -public class PutElasticsearch extends AbstractElasticsearchProcessor { +public class PutElasticsearch extends AbstractElasticsearchTransportClientProcessor { static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java new file mode 100644 index 0000000000..92b1452600 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.RequestBody; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.AttributeExpression; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.node.ArrayNode; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.apache.commons.lang3.StringUtils.trimToEmpty; + + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"elasticsearch", "insert", "update", "upsert", "delete", "write", "put", "http"}) +@CapabilityDescription("Writes the contents of a FlowFile to Elasticsearch, using the specified parameters such as " + + "the index to insert into and the type of the document.") +public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFiles that are written to Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFiles that cannot be written to Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") + .description("A FlowFile is routed to this relationship if the database cannot be updated but attempting the operation again may succeed") + .build(); + + public static final PropertyDescriptor ID_ATTRIBUTE = new PropertyDescriptor.Builder() + .name("put-es-id-attr") + .displayName("Identifier Attribute") + .description("The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is \"index\", " + + "this property may be left empty or evaluate to an empty value, in which case the document's identifier will be " + + "auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.ATTRIBUTE_KEY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("put-es-index") + .displayName("Index") + .description("The name of the index to insert into") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( + AttributeExpression.ResultType.STRING, true)) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("put-es-type") + .displayName("Type") + .description("The type of this document (used by Elasticsearch for indexing and searching)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( + AttributeExpression.ResultType.STRING, true)) + .build(); + + public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() + .name("put-es-index-op") + .displayName("Index Operation") + .description("The type of the operation used to index (index, update, upsert, delete)") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( + AttributeExpression.ResultType.STRING, true)) + .defaultValue("index") + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("put-es-batch-size") + .displayName("Batch Size") + .description("The preferred number of flow files to put to the database in a single transaction. Note that the contents of the " + + "flow files will be stored in memory until the bulk operation is performed. Also the results should be returned in the " + + "same order the flow files were received.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("100") + .build(); + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_RETRY); + return Collections.unmodifiableSet(relationships); + } + + @Override + public final List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(ES_URL); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(CONNECT_TIMEOUT); + descriptors.add(RESPONSE_TIMEOUT); + descriptors.add(ID_ATTRIBUTE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CHARSET); + descriptors.add(BATCH_SIZE); + descriptors.add(INDEX_OP); + return Collections.unmodifiableList(descriptors); + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List problems = new ArrayList<>(super.customValidate(validationContext)); + // Since Expression Language is allowed for index operation, we can't guarantee that we can catch + // all invalid configurations, but we should catch them as soon as we can. For example, if the + // Identifier Attribute property is empty, the Index Operation must evaluate to "index". + String idAttribute = validationContext.getProperty(ID_ATTRIBUTE).getValue(); + String indexOp = validationContext.getProperty(INDEX_OP).getValue(); + + if (StringUtils.isEmpty(idAttribute)) { + switch (indexOp.toLowerCase()) { + case "update": + case "upsert": + case "delete": + case "": + problems.add(new ValidationResult.Builder() + .valid(false) + .subject(INDEX_OP.getDisplayName()) + .explanation("If Identifier Attribute is not set, Index Operation must evaluate to \"index\"") + .build()); + break; + default: + break; + } + } + return problems; + } + + @OnScheduled + public void setup(ProcessContext context) { + super.setup(context); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + + final List flowFiles = session.get(batchSize); + if (flowFiles.isEmpty()) { + return; + } + + final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + // Authentication + final String username = context.getProperty(USERNAME).getValue(); + final String password = context.getProperty(PASSWORD).getValue(); + + + OkHttpClient okHttpClient = getClient(); + final ComponentLog logger = getLogger(); + + // Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list. + List flowFilesToTransfer = new LinkedList<>(flowFiles); + + final StringBuilder sb = new StringBuilder(); + final String baseUrl = trimToEmpty(context.getProperty(ES_URL).getValue()); + final URL url; + try { + url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk"); + } catch (MalformedURLException mue) { + // Since we have a URL validator, something has gone very wrong, throw a ProcessException + context.yield(); + throw new ProcessException(mue); + } + + for (FlowFile file : flowFiles) { + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); + if (StringUtils.isEmpty(index)) { + logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file}); + flowFilesToTransfer.remove(file); + session.transfer(file, REL_FAILURE); + continue; + } + final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue(); + String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue(); + if (StringUtils.isEmpty(indexOp)) { + logger.error("No Index operation specified for {}, transferring to failure.", new Object[]{file}); + flowFilesToTransfer.remove(file); + session.transfer(file, REL_FAILURE); + continue; + } + + switch (indexOp.toLowerCase()) { + case "index": + case "update": + case "upsert": + case "delete": + break; + default: + logger.error("Index operation {} not supported for {}, transferring to failure.", new Object[]{indexOp, file}); + flowFilesToTransfer.remove(file); + session.transfer(file, REL_FAILURE); + continue; + } + + final String id = (id_attribute != null) ? file.getAttribute(id_attribute) : null; + + // The ID must be valid for all operations except "index". For that case, + // a missing ID indicates one is to be auto-generated by Elasticsearch + if (id == null && !indexOp.equalsIgnoreCase("index")) { + logger.error("Index operation {} requires a valid identifier value from a flow file attribute, transferring to failure.", + new Object[]{indexOp, file}); + flowFilesToTransfer.remove(file); + session.transfer(file, REL_FAILURE); + continue; + } + + final StringBuilder json = new StringBuilder(); + session.read(file, in -> { + json.append(IOUtils.toString(in, charset).replace("\r\n", " ").replace('\n', ' ').replace('\r', ' ')); + }); + if (indexOp.equalsIgnoreCase("index")) { + sb.append("{\"index\": { \"_index\": \""); + sb.append(index); + sb.append("\", \"_type\": \""); + sb.append(docType); + sb.append("\""); + if (!StringUtils.isEmpty(id)) { + sb.append(", \"_id\": \""); + sb.append(id); + sb.append("\""); + } + sb.append("}}\n"); + sb.append(json); + sb.append("\n"); + } else if (indexOp.equalsIgnoreCase("upsert") || indexOp.equalsIgnoreCase("update")) { + sb.append("{\"update\": { \"_index\": \""); + sb.append(index); + sb.append("\", \"_type\": \""); + sb.append(docType); + sb.append("\", \"_id\": \""); + sb.append(id); + sb.append("\" }\n"); + sb.append("{\"doc\": "); + sb.append(json); + sb.append(", \"doc_as_upsert\": "); + sb.append(indexOp.equalsIgnoreCase("upsert")); + sb.append(" }\n"); + } else if (indexOp.equalsIgnoreCase("delete")) { + sb.append("{\"delete\": { \"_index\": \""); + sb.append(index); + sb.append("\", \"_type\": \""); + sb.append(docType); + sb.append("\", \"_id\": \""); + sb.append(id); + sb.append("\" }\n"); + } + } + if (!flowFilesToTransfer.isEmpty()) { + RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), sb.toString()); + final Response getResponse; + try { + getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "PUT", requestBody); + } catch (IllegalStateException | IOException ioe) { + throw new ProcessException(ioe); + } + final int statusCode = getResponse.code(); + + if (isSuccess(statusCode)) { + ResponseBody responseBody = getResponse.body(); + try { + final byte[] bodyBytes = responseBody.bytes(); + + JsonNode responseJson = parseJsonResponse(new ByteArrayInputStream(bodyBytes)); + boolean errors = responseJson.get("errors").asBoolean(false); + if (errors) { + ArrayNode itemNodeArray = (ArrayNode) responseJson.get("items"); + if (itemNodeArray.size() > 0) { + // All items are returned whether they succeeded or failed, so iterate through the item array + // at the same time as the flow file list, moving each to success or failure accordingly + for (int i = 0; i < itemNodeArray.size(); i++) { + JsonNode itemNode = itemNodeArray.get(i); + FlowFile flowFile = flowFilesToTransfer.remove(i); + int status = itemNode.findPath("status").asInt(); + if (!isSuccess(status)) { + String reason = itemNode.findPath("//error/reason").asText(); + logger.error("Failed to insert {} into Elasticsearch due to {}, transferring to failure", + new Object[]{flowFile, reason}); + session.transfer(flowFile, REL_FAILURE); + + } else { + session.transfer(flowFile, REL_SUCCESS); + // Record provenance event + session.getProvenanceReporter().send(flowFile, url.toString()); + } + } + } + } + // Transfer any remaining flowfiles to success + flowFilesToTransfer.forEach(file -> { + session.transfer(file, REL_SUCCESS); + // Record provenance event + session.getProvenanceReporter().send(file, url.toString()); + }); + } catch (IOException ioe) { + // Something went wrong when parsing the response, log the error and route to failure + logger.error("Error parsing Bulk API response: {}", new Object[]{ioe.getMessage()}, ioe); + session.transfer(flowFilesToTransfer, REL_FAILURE); + context.yield(); + } + } else { + // Something went wrong during the bulk update, throw a ProcessException to indicate rollback + throw new ProcessException("Received error code " + statusCode + " from Elasticsearch API"); + } + } + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index e5046fc5b9..782f87e89a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,3 +14,5 @@ # limitations under the License. org.apache.nifi.processors.elasticsearch.FetchElasticsearch org.apache.nifi.processors.elasticsearch.PutElasticsearch +org.apache.nifi.processors.elasticsearch.FetchElasticsearchHttp +org.apache.nifi.processors.elasticsearch.PutElasticsearchHttp diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java index cb928fa799..9b68f2eeb1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java @@ -79,10 +79,10 @@ public class TestFetchElasticsearch { public void testFetchElasticsearchOnTrigger() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found runner.setValidateExpressionUsage(true); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.assertNotValid(); @@ -105,10 +105,10 @@ public class TestFetchElasticsearch { @Test public void testFetchElasticsearchOnTriggerWithFailures() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -129,10 +129,10 @@ public class TestFetchElasticsearch { @Test public void testFetchElasticsearchWithBadHosts() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "http://127.0.0.1:9300,127.0.0.2:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "http://127.0.0.1:9300,127.0.0.2:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -145,10 +145,10 @@ public class TestFetchElasticsearch { public void testFetchElasticsearchOnTriggerWithExceptions() throws IOException { FetchElasticsearchTestProcessor processor = new FetchElasticsearchTestProcessor(true); runner = TestRunners.newTestRunner(processor); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -230,10 +230,10 @@ public class TestFetchElasticsearch { runner.addControllerService("ssl-context", sslService); runner.enableControllerService(sslService); runner.setProperty(FetchElasticsearch.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -347,10 +347,10 @@ public class TestFetchElasticsearch { runner.setValidateExpressionUsage(true); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); @@ -378,10 +378,10 @@ public class TestFetchElasticsearch { runner.setValidateExpressionUsage(true); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(FetchElasticsearch.INDEX, "doc"); runner.setProperty(FetchElasticsearch.TYPE, "status"); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java new file mode 100644 index 0000000000..82fa3dae2a --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestFetchElasticsearchHttp { + + private InputStream docExample; + private TestRunner runner; + + @Before + public void setUp() throws IOException { + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + docExample = classloader.getResourceAsStream("DocumentExample.json"); + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testFetchElasticsearchOnTrigger() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithFields() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + runner.setProperty(FetchElasticsearchHttp.FIELDS, "id,, userinfo.location"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithDocNotFound() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.setIncomingConnection(true); + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + // This test generates a "document not found" + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_NOT_FOUND, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_NOT_FOUND).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithServerErrorRetry() throws IOException { + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(false); + processor.setStatus(500, "Server error"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + // This test generates a HTTP 500 "Server error" + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_RETRY, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_RETRY).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithServerFail() throws IOException { + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(false); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + // This test generates a HTTP 100 + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testFetchElasticsearchOnTriggerWithServerFailNoIncomingFlowFile() throws IOException { + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(false); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.setIncomingConnection(false); + runner.run(1, true, true); + + // This test generates a HTTP 100 with no incoming flow file, so nothing should be transferred + processor.getRelationships().forEach(relationship -> runner.assertTransferCount(relationship, 0)); + runner.assertTransferCount(FetchElasticsearchHttp.REL_FAILURE, 0); + } + + @Test + public void testFetchElasticsearchWithBadHosts() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + runner.assertNotValid(); + } + + @Test + public void testSetupSecureClient() throws Exception { + FetchElasticsearchHttpTestProcessor processor = new FetchElasticsearchHttpTestProcessor(true); + runner = TestRunners.newTestRunner(processor); + SSLContextService sslService = mock(SSLContextService.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + runner.addControllerService("ssl-context", sslService); + runner.enableControllerService(sslService); + runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setValidateExpressionUsage(true); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + + // Allow time for the controller service to fully initialize + Thread.sleep(500); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class FetchElasticsearchHttpTestProcessor extends FetchElasticsearchHttp { + boolean documentExists = true; + Exception exceptionToThrow = null; + OkHttpClient client; + int statusCode = 200; + String statusMessage = "OK"; + + FetchElasticsearchHttpTestProcessor(boolean documentExists) { + this.documentExists = documentExists; + } + + public void setExceptionToThrow(Exception exceptionToThrow) { + this.exceptionToThrow = exceptionToThrow; + } + + void setStatus(int code, String message) { + statusCode = code; + statusMessage = message; + } + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + client = mock(OkHttpClient.class); + + when(client.newCall(any(Request.class))).thenAnswer(new Answer() { + + @Override + public Call answer(InvocationOnMock invocationOnMock) throws Throwable { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + StringBuilder sb = new StringBuilder("{\"_index\":\"randomuser.me\",\"_type\":\"user\",\"_id\":\"0\",\"_version\":2,"); + if (documentExists) { + sb.append("\"found\":true,\"_source\":{\"gender\":\"female\",\"name\":{\"title\":\"Ms\",\"first\":\"Joan\",\"last\":\"Smith\"}}"); + } else { + sb.append("\"found\": false"); + } + sb.append("}"); + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString())) + .build(); + final Call call = mock(Call.class); + when(call.execute()).thenReturn(mockResponse); + return call; + } + }); + } + + protected OkHttpClient getClient() { + return client; + } + } + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Tests basic ES functionality against a local or test ES cluster + */ + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testFetchElasticsearchBasic() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testFetchElasticsearchBatch() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + runner.setValidateExpressionUsage(true); + + //Local Cluster - Mac pulled from brew + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + + for (int i = 0; i < 100; i++) { + long newId = 28039652140L + i; + final String newStrId = Long.toString(newId); + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", newStrId); + }}); + } + runner.run(100); + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index ce25b8108a..d7fb43934d 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -78,10 +78,10 @@ public class TestPutElasticsearch { public void testPutElasticSearchOnTrigger() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures runner.setValidateExpressionUsage(true); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.assertNotValid(); @@ -106,10 +106,10 @@ public class TestPutElasticsearch { public void testPutElasticSearchOnTriggerWithFailures() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); @@ -130,10 +130,10 @@ public class TestPutElasticsearch { public void testPutElasticsearchOnTriggerWithExceptions() throws IOException { PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(false); runner = TestRunners.newTestRunner(processor); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setValidateExpressionUsage(true); @@ -194,10 +194,10 @@ public class TestPutElasticsearch { public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.TYPE, "status"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); @@ -215,10 +215,10 @@ public class TestPutElasticsearch { public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); runner.setValidateExpressionUsage(false); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "${i}"); runner.setProperty(PutElasticsearch.TYPE, "${type}"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); @@ -252,10 +252,10 @@ public class TestPutElasticsearch { public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures runner.setValidateExpressionUsage(true); - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.assertNotValid(); @@ -380,10 +380,10 @@ public class TestPutElasticsearch { runner.setValidateExpressionUsage(false); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); @@ -412,10 +412,10 @@ public class TestPutElasticsearch { runner.setValidateExpressionUsage(false); //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchProcessor.CLUSTER_NAME, "elasticsearch_brew"); - runner.setProperty(AbstractElasticsearchProcessor.HOSTS, "127.0.0.1:9300"); - runner.setProperty(AbstractElasticsearchProcessor.PING_TIMEOUT, "5s"); - runner.setProperty(AbstractElasticsearchProcessor.SAMPLER_INTERVAL, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "elasticsearch_brew"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "5s"); runner.setProperty(PutElasticsearch.INDEX, "doc"); runner.setProperty(PutElasticsearch.BATCH_SIZE, "100"); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java new file mode 100644 index 0000000000..c3d5a3497c --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import okhttp3.Call; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Protocol; +import okhttp3.Request; +import okhttp3.Response; +import okhttp3.ResponseBody; +import org.apache.commons.io.IOUtils; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.HashMap; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestPutElasticsearchHttp { + + private static byte[] docExample; + private TestRunner runner; + + @Before + public void once() throws IOException { + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + docExample = IOUtils.toString(classloader.getResourceAsStream("DocumentExample.json")).getBytes(); + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testPutElasticSearchOnTriggerIndex() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerUpdate() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "Update"); + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerDelete() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "DELETE"); + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "${no.attr}"); + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + + @Test + public void testPutElasticSearchInvalidConfig() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, ""); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "upsert"); + runner.assertNotValid(); + } + + @Test(expected = AssertionError.class) + public void testPutElasticSearchOnTriggerWithFailures() throws IOException { + PutElasticsearchTestProcessor processor = new PutElasticsearchTestProcessor(true); + processor.setStatus(100, "Should fail"); + runner = TestRunners.newTestRunner(processor); // simulate failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + } + + @Test + public void testPutElasticsearchOnTriggerWithNoIdAttribute() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + } + + @Test + public void testPutElasticsearchOnTriggerWithIndexFromAttribute() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); + runner.setValidateExpressionUsage(false); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "${i}"); + runner.setProperty(PutElasticsearchHttp.TYPE, "${type}"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652144"); + put("i", "doc"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + runner.clearTransferState(); + + // Now try an empty attribute value, should fail + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652144"); + put("type", "status"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out2 = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out2); + } + + @Test + public void testPutElasticSearchOnTriggerWithInvalidIndexOp() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.assertValid(); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + + runner.setProperty(PutElasticsearchHttp.INDEX_OP, "index_fail"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_FAILURE).get(0); + assertNotNull(out); + } + + /** + * A Test class that extends the processor in order to inject/mock behavior + */ + private static class PutElasticsearchTestProcessor extends PutElasticsearchHttp { + boolean responseHasFailures = false; + OkHttpClient client; + int statusCode = 200; + String statusMessage = "OK"; + + PutElasticsearchTestProcessor(boolean responseHasFailures) { + this.responseHasFailures = responseHasFailures; + } + + void setStatus(int code, String message) { + statusCode = code; + statusMessage = message; + } + + @Override + protected void createElasticsearchClient(ProcessContext context) throws ProcessException { + client = mock(OkHttpClient.class); + + when(client.newCall(any(Request.class))).thenAnswer(new Answer() { + + @Override + public Call answer(InvocationOnMock invocationOnMock) throws Throwable { + Request realRequest = (Request) invocationOnMock.getArguments()[0]; + StringBuilder sb = new StringBuilder("{\"took\": 1, \"errors\": \""); + sb.append(responseHasFailures); + sb.append("\", \"items\": ["); + if (responseHasFailures) { + // This case is for a status code of 200 for the bulk response itself, but with an error (of 400) inside + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":\"400\","); + sb.append("\"error\":{\"type\":\"mapper_parsing_exception\",\"reason\":\"failed to parse [gender]\","); + sb.append("\"caused_by\":{\"type\":\"json_parse_exception\",\"reason\":\"Unexpected end-of-input in VALUE_STRING\\n at "); + sb.append("[Source: org.elasticsearch.common.io.stream.InputStreamStreamInput@1a2e3ac4; line: 1, column: 39]\"}}}}"); + } else { + sb.append("{\"index\":{\"_index\":\"doc\",\"_type\":\"status\",\"_id\":\"28039652140\",\"status\":"); + sb.append(statusCode); + sb.append(",\"_source\":{\"text\": \"This is a test document\"}}}"); + } + sb.append("]}"); + Response mockResponse = new Response.Builder() + .request(realRequest) + .protocol(Protocol.HTTP_1_1) + .code(statusCode) + .message(statusMessage) + .body(ResponseBody.create(MediaType.parse("application/json"), sb.toString())) + .build(); + final Call call = mock(Call.class); + when(call.execute()).thenReturn(mockResponse); + return call; + } + }); + } + + protected OkHttpClient getClient() { + return client; + } + } + + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Integration test section below + // + // The tests below are meant to run on real ES instances, and are thus @Ignored during normal test execution. + // However if you wish to execute them as part of a test phase, comment out the @Ignored line for each + // desired test. + ///////////////////////////////////////////////////////////////////////////////////////////////////////////// + + /** + * Tests basic ES functionality against a local or test ES cluster + */ + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBasic() { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + + runner.enqueue(docExample); + runner.run(1, true, true); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + } + + @Test + @Ignore("Comment this out if you want to run against local or test ES") + public void testPutElasticSearchBatch() throws IOException { + System.out.println("Starting test " + new Object() { + }.getClass().getEnclosingMethod().getName()); + final TestRunner runner = TestRunners.newTestRunner(new PutElasticsearchHttp()); + runner.setValidateExpressionUsage(false); + + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "100"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + + for (int i = 0; i < 100; i++) { + long newId = 28039652140L + i; + final String newStrId = Long.toString(newId); + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", newStrId); + }}); + } + runner.run(); + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 100); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json index 014a66c05a..66449cf1e1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/resources/DocumentExample.json @@ -1,19 +1,3 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ { "created_at": "Thu Jan 21 16:02:46 +0000 2016", "text": "This is a test document from a mock social media service",