diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index c4121b5611..d67ce6c540 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -56,6 +56,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .description("Elasticsearch URL which will be connected to, including scheme (http, e.g.), host, and port. The default port for the REST API is 9200.") .required(true) .addValidator(StandardValidators.URL_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder() @@ -81,6 +82,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .required(true) .defaultValue("5 secs") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor RESPONSE_TIMEOUT = new PropertyDescriptor.Builder() @@ -90,6 +92,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .required(true) .defaultValue("15 secs") .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); @@ -109,8 +112,8 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic } // Set timeouts - okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); - okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); + okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS); + okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS); final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(SSLContextService.ClientAuth.NONE); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java index 76c7224412..c5e4cc3d79 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; @@ -28,7 +29,6 @@ import org.apache.nifi.util.StringUtils; import java.util.Collection; import java.util.HashSet; -import java.util.Map; import java.util.Set; /** @@ -36,6 +36,13 @@ import java.util.Set; */ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { + static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); + } + return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, value, context); + }; + public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("The SSL Context Service used to provide client certificate information for TLS/SSL " @@ -50,6 +57,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .required(true) .defaultValue("UTF-8") .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() @@ -57,6 +65,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .description("Username to access the Elasticsearch cluster") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() @@ -65,6 +74,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { .required(false) .sensitive(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected abstract void createElasticsearchClient(ProcessContext context) throws ProcessException; @@ -74,8 +84,9 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { Set results = new HashSet<>(); // Ensure that if username or password is set, then the other is too - Map propertyMap = validationContext.getProperties(); - if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != StringUtils.isEmpty(propertyMap.get(PASSWORD))) { + String userName = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); + if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) { results.add(new ValidationResult.Builder().valid(false).explanation( "If username or password is specified, then the other must be specified as well").build()); } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java index e1c9d4d7ea..a16a0dd6a5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.logging.ComponentLog; @@ -50,21 +49,20 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst /** * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries */ - private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final List esList = Arrays.asList(input.split(",")); - for (String hostnamePort : esList) { - String[] addresses = hostnamePort.split(":"); - // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) - if (addresses.length != 2) { - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Must be in hostname:port form (no scheme such as http://").valid(false).build(); - } - } - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Valid cluster definition").valid(true).build(); + private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); } + final List esList = Arrays.asList(input.split(",")); + for (String hostnamePort : esList) { + String[] addresses = hostnamePort.split(":"); + // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) + if (addresses.length != 2) { + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Must be in hostname:port form (no scheme such as http://").valid(false).build(); + } + } + return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build(); }; protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() @@ -73,6 +71,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("elasticsearch") + .expressionLanguageSupported(true) .build(); protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder() @@ -83,6 +82,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst .required(true) .expressionLanguageSupported(false) .addValidator(HOSTNAME_PORT_VALIDATOR) + .expressionLanguageSupported(true) .build(); public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder() @@ -93,6 +93,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst + "lib/ directory, doing so will prevent the Shield plugin from being loaded.") .required(false) .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder() @@ -101,7 +102,8 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst "For example, 5s (5 seconds). If non-local recommended is 30s") .required(true) .defaultValue("5s") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder() @@ -110,7 +112,8 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst + "If non-local recommended is 30s.") .required(true) .defaultValue("5s") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(true) .build(); protected AtomicReference esClient = new AtomicReference<>(); @@ -135,11 +138,11 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst log.debug("Creating ElasticSearch Client"); try { - final String clusterName = context.getProperty(CLUSTER_NAME).getValue(); - final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue(); - final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue(); - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String clusterName = context.getProperty(CLUSTER_NAME).evaluateAttributeExpressions().getValue(); + final String pingTimeout = context.getProperty(PING_TIMEOUT).evaluateAttributeExpressions().getValue(); + final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).evaluateAttributeExpressions().getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); @@ -149,7 +152,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst .put("client.transport.ping_timeout", pingTimeout) .put("client.transport.nodes_sampler_interval", samplerInterval); - String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue(); + String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).evaluateAttributeExpressions().getValue(); if (sslService != null) { settingsBuilder.put("shield.transport.ssl", "true") .put("shield.ssl.keystore.path", sslService.getKeyStoreFile()) @@ -171,7 +174,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password); - final String hosts = context.getProperty(HOSTS).getValue(); + final String hosts = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue(); esHosts = getEsHosts(hosts); if (esHosts != null) { @@ -268,6 +271,9 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst for (String item : esList) { String[] addresses = item.split(":"); + if (addresses.length != 2) { + throw new ArrayIndexOutOfBoundsException("Not in host:port format"); + } final String hostName = addresses[0].trim(); final int port = Integer.parseInt(addresses[1].trim()); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java index 67aaae7877..643edbb6ef 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch.java @@ -105,18 +105,17 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc .build(); - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - relationships.add(REL_NOT_FOUND); - return Collections.unmodifiableSet(relationships); - } + private static final Set relationships; + private static final List propertyDescriptors; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + _rels.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(CLUSTER_NAME); descriptors.add(HOSTS); @@ -131,9 +130,18 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc descriptors.add(TYPE); descriptors.add(CHARSET); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); } + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } @OnScheduled public void setup(ProcessContext context) { @@ -151,7 +159,7 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue()); final ComponentLog logger = getLogger(); try { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index ac598bfcfd..8fd30dcc9e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -131,19 +131,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + private static final Set relationships; + private static final List propertyDescriptors; - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - relationships.add(REL_NOT_FOUND); - return Collections.unmodifiableSet(relationships); - } + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + _rels.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -156,9 +154,18 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { descriptors.add(TYPE); descriptors.add(FIELDS); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); } + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } @OnScheduled public void setup(ProcessContext context) { @@ -194,21 +201,22 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { : null; // Authentication - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); final ComponentLog logger = getLogger(); + Response getResponse = null; try { logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId}); // read the url property from the context - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue()); + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); final URL url = buildRequestURL(urlstr, docId, index, docType, fields); final long startNanos = System.nanoTime(); - final Response getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null); + getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null); final int statusCode = getResponse.code(); if (isSuccess(statusCode)) { @@ -290,6 +298,10 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { session.remove(flowFile); } context.yield(); + } finally { + if (getResponse != null) { + getResponse.close(); + } } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index 216efd4a77..d208d40d34 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -96,8 +96,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces .description("The type of this document (used by Elasticsearch for indexing and searching)") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( - AttributeExpression.ResultType.STRING, true)) + .addValidator(NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() @@ -105,8 +104,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces .description("The type of the operation used to index (index, update, upsert)") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( - AttributeExpression.ResultType.STRING, true)) + .addValidator(NON_EMPTY_EL_VALIDATOR) .defaultValue("index") .build(); @@ -116,20 +114,19 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("100") + .expressionLanguageSupported(true) .build(); + private static final Set relationships; + private static final List propertyDescriptors; - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - return Collections.unmodifiableSet(relationships); - } + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(CLUSTER_NAME); descriptors.add(HOSTS); @@ -146,7 +143,17 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces descriptors.add(BATCH_SIZE); descriptors.add(INDEX_OP); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } @OnScheduled @@ -156,16 +163,16 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + + final ComponentLog logger = getLogger(); final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); final List flowFiles = session.get(batchSize); if (flowFiles.isEmpty()) { return; } - final ComponentLog logger = getLogger(); // Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list. List flowFilesToTransfer = new LinkedList<>(flowFiles); try { @@ -178,6 +185,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue(); final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue()); final String id = file.getAttribute(id_attribute); if (id == null) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 3ba46bb362..2b39a86b95 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -104,8 +104,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .description("The type of this document (used by Elasticsearch for indexing and searching)") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( - AttributeExpression.ResultType.STRING, true)) + .addValidator(NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() @@ -114,8 +113,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .description("The type of the operation used to index (index, update, upsert, delete)") .required(true) .expressionLanguageSupported(true) - .addValidator(StandardValidators.createAttributeExpressionLanguageValidator( - AttributeExpression.ResultType.STRING, true)) + .addValidator(NON_EMPTY_EL_VALIDATOR) .defaultValue("index") .build(); @@ -128,19 +126,19 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .defaultValue("100") + .expressionLanguageSupported(true) .build(); - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - return Collections.unmodifiableSet(relationships); - } + private static final Set relationships; + private static final List propertyDescriptors; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -154,7 +152,18 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { descriptors.add(CHARSET); descriptors.add(BATCH_SIZE); descriptors.add(INDEX_OP); - return Collections.unmodifiableList(descriptors); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } @Override @@ -192,7 +201,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); final List flowFiles = session.get(batchSize); if (flowFiles.isEmpty()) { @@ -200,10 +209,10 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue(); - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + // Authentication - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); OkHttpClient okHttpClient = getClient(); @@ -213,7 +222,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { List flowFilesToTransfer = new LinkedList<>(flowFiles); final StringBuilder sb = new StringBuilder(); - final String baseUrl = trimToEmpty(context.getProperty(ES_URL).getValue()); + final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); final URL url; try { url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk"); @@ -225,6 +234,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { for (FlowFile file : flowFiles) { final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue(); + final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue()); if (StringUtils.isEmpty(index)) { logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file}); flowFilesToTransfer.remove(file); @@ -368,6 +378,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()}); session.transfer(flowFilesToTransfer, REL_FAILURE); } + getResponse.close(); } } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index f921323143..f65816e125 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -174,17 +174,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build(); - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - relationships.add(REL_RETRY); - return Collections.unmodifiableSet(relationships); - } + private static final Set relationships; + private static final List propertyDescriptors; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + _rels.add(REL_RETRY); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -201,7 +200,17 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { descriptors.add(LIMIT); descriptors.add(TARGET); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } @OnScheduled @@ -247,8 +256,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .equals(TARGET_FLOW_FILE_CONTENT); // Authentication - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); final ComponentLog logger = getLogger(); @@ -261,7 +270,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { final long startNanos = System.nanoTime(); // read the url property from the context - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue()); + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); boolean hitLimit = false; do { @@ -279,6 +288,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { numResults = this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos, targetIsContent); fromIndex += pageSize; + getResponse.close(); } while (numResults > 0 && !hitLimit); if (flowFile != null) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index 3d897cf975..0442bf7bf1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -159,16 +159,15 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor .required(true).expressionLanguageSupported(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build(); - @Override - public Set getRelationships() { - final Set relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - return Collections.unmodifiableSet(relationships); - } + private static final Set relationships; + private static final List propertyDescriptors; + + static { + final Set _rels = new HashSet<>(); + _rels.add(REL_SUCCESS); + _rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_rels); - @Override - public final List getSupportedPropertyDescriptors() { final List descriptors = new ArrayList<>(); descriptors.add(ES_URL); descriptors.add(PROP_SSL_CONTEXT_SERVICE); @@ -184,7 +183,17 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor descriptors.add(FIELDS); descriptors.add(SORT); - return Collections.unmodifiableList(descriptors); + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; } @OnScheduled @@ -227,18 +236,18 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null; // Authentication - final String username = context.getProperty(USERNAME).getValue(); - final String password = context.getProperty(PASSWORD).getValue(); + final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); + final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); final ComponentLog logger = getLogger(); try { String scrollId = loadScrollId(context.getStateManager()); + // read the url property from the context + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions() + .getValue()); if (scrollId != null) { - // read the url property from the context - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL) - .getValue()); final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort, scrollId, pageSize, scroll); final long startNanos = System.nanoTime(); @@ -246,13 +255,12 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl, username, password, "GET", null); this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos); + getResponse.close(); } else { logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index, docType, query }); // read the url property from the context - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL) - .getValue()); final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, scrollId, pageSize, scroll); final long startNanos = System.nanoTime(); @@ -260,6 +268,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, username, password, "GET", null); this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos); + getResponse.close(); } } catch (IOException ioe) { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java index 9b68f2eeb1..ba22b65c92 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearch.java @@ -102,6 +102,37 @@ public class TestFetchElasticsearch { out.assertAttributeEquals("doc_id", "28039652140"); } + @Test + public void testFetchElasticsearchOnTriggerEL() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}"); + + runner.setProperty(FetchElasticsearch.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearch.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}"); + runner.assertValid(); + runner.setVariable("cluster.name", "elasticsearch"); + runner.setVariable("hosts", "127.0.0.1:9300"); + runner.setVariable("ping.timeout", "5s"); + runner.setVariable("sampler.interval", "5s"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + @Test public void testFetchElasticsearchOnTriggerWithFailures() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java index 82fa3dae2a..28bc06091d 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -61,6 +61,35 @@ public class TestFetchElasticsearchHttp { runner = null; } + @Test + public void testFetchElasticsearchOnTriggerEL() throws IOException { + runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.assertValid(); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + runner.setVariable("connect.timeout", "5s"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + @Test public void testFetchElasticsearchOnTrigger() throws IOException { runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java index 4e6a820d62..6d6da5a01f 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearch.java @@ -102,6 +102,39 @@ public class TestPutElasticsearch { out.assertAttributeEquals("doc_id", "28039652140"); } + @Test + public void testPutElasticSearchOnTriggerEL() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}"); + runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}"); + + runner.setProperty(PutElasticsearch.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.TYPE, "status"); + runner.setProperty(PutElasticsearch.BATCH_SIZE, "1"); + runner.assertNotValid(); + runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id"); + runner.assertValid(); + runner.setVariable("cluster.name", "elasticsearch"); + runner.setVariable("hosts", "127.0.0.1:9300"); + runner.setVariable("ping.timeout", "5s"); + runner.setVariable("sampler.interval", "5s"); + + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + @Test public void testPutElasticSearchOnTriggerWithFailures() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index 9ce578f615..fae63eeb62 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -126,6 +126,33 @@ public class TestPutElasticsearchHttp { out.assertAttributeEquals("doc_id", "28039652140"); } + @Test + public void testPutElasticSearchOnTriggerEL() throws IOException { + runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); + runner.setProperty(PutElasticsearchHttp.TYPE, "status"); + runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); + runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + runner.setVariable("connect.timeout", "5s"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + @Test public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java index b9ec1f98af..ccd74faed3 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java @@ -76,6 +76,29 @@ public class TestQueryElasticsearchHttp { runAndVerifySuccess(true); } + @Test + public void testQueryElasticsearchOnTrigger_withInput_EL() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + + runAndVerifySuccess(true); + } + @Test public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java index 2616269257..a1a4e8df2e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java @@ -78,6 +78,30 @@ public class TestScrollElasticsearchHttp { runAndVerifySuccess(); } + @Test + public void testScrollElasticsearchOnTrigger_withNoInput_EL() throws IOException { + runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor()); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}"); + + runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.QUERY, + "source:WZ AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); + runner.assertValid(); + + runner.setVariable("es.url", "http://127.0.0.1:9200"); + + runner.setIncomingConnection(false); + runAndVerifySuccess(); + } + private void runAndVerifySuccess() { runner.enqueue("".getBytes(), new HashMap() { {