NIFI-3095: Add EL support to Elasticsearch 2.x (and HTTP) processors

This closes #1276

Signed-off-by: jpercivall <JPercivall@apache.org>
This commit is contained in:
Matt Burgess 2016-11-29 11:29:47 -05:00 committed by jpercivall
parent 8da38acf31
commit 69b23adf1b
15 changed files with 372 additions and 127 deletions

View File

@ -56,6 +56,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.description("Elasticsearch URL which will be connected to, including scheme (http, e.g.), host, and port. The default port for the REST API is 9200.")
.required(true)
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
@ -81,6 +82,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.required(true)
.defaultValue("5 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor RESPONSE_TIMEOUT = new PropertyDescriptor.Builder()
@ -90,6 +92,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.required(true)
.defaultValue("15 secs")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(true)
.build();
private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
@ -109,8 +112,8 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
}
// Set timeouts
okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
okHttpClient.connectTimeout((context.getProperty(CONNECT_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue()), TimeUnit.MILLISECONDS);
okHttpClient.readTimeout(context.getProperty(RESPONSE_TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS).intValue(), TimeUnit.MILLISECONDS);
final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
final SSLContext sslContext = sslService == null ? null : sslService.createSSLContext(SSLContextService.ClientAuth.NONE);

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
@ -28,7 +29,6 @@ import org.apache.nifi.util.StringUtils;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
@ -36,6 +36,13 @@ import java.util.Set;
*/
public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, value, context);
};
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "
@ -50,6 +57,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
@ -57,6 +65,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
.description("Username to access the Elasticsearch cluster")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
@ -65,6 +74,7 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
.required(false)
.sensitive(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
protected abstract void createElasticsearchClient(ProcessContext context) throws ProcessException;
@ -74,8 +84,9 @@ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
Set<ValidationResult> results = new HashSet<>();
// Ensure that if username or password is set, then the other is too
Map<PropertyDescriptor, String> propertyMap = validationContext.getProperties();
if (StringUtils.isEmpty(propertyMap.get(USERNAME)) != StringUtils.isEmpty(propertyMap.get(PASSWORD))) {
String userName = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
if (StringUtils.isEmpty(userName) != StringUtils.isEmpty(password)) {
results.add(new ValidationResult.Builder().valid(false).explanation(
"If username or password is specified, then the other must be specified as well").build());
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
@ -50,9 +49,10 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
/**
* This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
*/
private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
final List<String> esList = Arrays.asList(input.split(","));
for (String hostnamePort : esList) {
String[] addresses = hostnamePort.split(":");
@ -62,9 +62,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
"Must be in hostname:port form (no scheme such as http://").valid(false).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Valid cluster definition").valid(true).build();
}
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build();
};
protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
@ -73,6 +71,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("elasticsearch")
.expressionLanguageSupported(true)
.build();
protected static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
@ -83,6 +82,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
.required(true)
.expressionLanguageSupported(false)
.addValidator(HOSTNAME_PORT_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor PROP_SHIELD_LOCATION = new PropertyDescriptor.Builder()
@ -93,6 +93,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
+ "lib/ directory, doing so will prevent the Shield plugin from being loaded.")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.expressionLanguageSupported(true)
.build();
protected static final PropertyDescriptor PING_TIMEOUT = new PropertyDescriptor.Builder()
@ -101,7 +102,8 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
"For example, 5s (5 seconds). If non-local recommended is 30s")
.required(true)
.defaultValue("5s")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(true)
.build();
protected static final PropertyDescriptor SAMPLER_INTERVAL = new PropertyDescriptor.Builder()
@ -110,7 +112,8 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
+ "If non-local recommended is 30s.")
.required(true)
.defaultValue("5s")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(true)
.build();
protected AtomicReference<Client> esClient = new AtomicReference<>();
@ -135,11 +138,11 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
log.debug("Creating ElasticSearch Client");
try {
final String clusterName = context.getProperty(CLUSTER_NAME).getValue();
final String pingTimeout = context.getProperty(PING_TIMEOUT).getValue();
final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).getValue();
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final String clusterName = context.getProperty(CLUSTER_NAME).evaluateAttributeExpressions().getValue();
final String pingTimeout = context.getProperty(PING_TIMEOUT).evaluateAttributeExpressions().getValue();
final String samplerInterval = context.getProperty(SAMPLER_INTERVAL).evaluateAttributeExpressions().getValue();
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
final SSLContextService sslService =
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
@ -149,7 +152,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
.put("client.transport.ping_timeout", pingTimeout)
.put("client.transport.nodes_sampler_interval", samplerInterval);
String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).getValue();
String shieldUrl = context.getProperty(PROP_SHIELD_LOCATION).evaluateAttributeExpressions().getValue();
if (sslService != null) {
settingsBuilder.put("shield.transport.ssl", "true")
.put("shield.ssl.keystore.path", sslService.getKeyStoreFile())
@ -171,7 +174,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
TransportClient transportClient = getTransportClient(settingsBuilder, shieldUrl, username, password);
final String hosts = context.getProperty(HOSTS).getValue();
final String hosts = context.getProperty(HOSTS).evaluateAttributeExpressions().getValue();
esHosts = getEsHosts(hosts);
if (esHosts != null) {
@ -268,6 +271,9 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
for (String item : esList) {
String[] addresses = item.split(":");
if (addresses.length != 2) {
throw new ArrayIndexOutOfBoundsException("Not in host:port format");
}
final String hostName = addresses[0].trim();
final int port = Integer.parseInt(addresses[1].trim());

View File

@ -105,18 +105,17 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
.build();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_RETRY);
relationships.add(REL_NOT_FOUND);
return Collections.unmodifiableSet(relationships);
}
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
_rels.add(REL_RETRY);
_rels.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(_rels);
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CLUSTER_NAME);
descriptors.add(HOSTS);
@ -131,9 +130,18 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
descriptors.add(TYPE);
descriptors.add(CHARSET);
return Collections.unmodifiableList(descriptors);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void setup(ProcessContext context) {
@ -151,7 +159,7 @@ public class FetchElasticsearch extends AbstractElasticsearchTransportClientProc
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
final String docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
final ComponentLog logger = getLogger();
try {

View File

@ -131,19 +131,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_RETRY);
relationships.add(REL_NOT_FOUND);
return Collections.unmodifiableSet(relationships);
}
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
_rels.add(REL_RETRY);
_rels.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(_rels);
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@ -156,9 +154,18 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
descriptors.add(TYPE);
descriptors.add(FIELDS);
return Collections.unmodifiableList(descriptors);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void setup(ProcessContext context) {
@ -194,21 +201,22 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
: null;
// Authentication
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
final ComponentLog logger = getLogger();
Response getResponse = null;
try {
logger.debug("Fetching {}/{}/{} from Elasticsearch", new Object[]{index, docType, docId});
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
final URL url = buildRequestURL(urlstr, docId, index, docType, fields);
final long startNanos = System.nanoTime();
final Response getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
final int statusCode = getResponse.code();
if (isSuccess(statusCode)) {
@ -290,6 +298,10 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
session.remove(flowFile);
}
context.yield();
} finally {
if (getResponse != null) {
getResponse.close();
}
}
}

View File

@ -96,8 +96,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.addValidator(NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@ -105,8 +104,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
.description("The type of the operation used to index (index, update, upsert)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.addValidator(NON_EMPTY_EL_VALIDATOR)
.defaultValue("index")
.build();
@ -116,20 +114,19 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.expressionLanguageSupported(true)
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_RETRY);
return Collections.unmodifiableSet(relationships);
}
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
_rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels);
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CLUSTER_NAME);
descriptors.add(HOSTS);
@ -146,7 +143,17 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
descriptors.add(BATCH_SIZE);
descriptors.add(INDEX_OP);
return Collections.unmodifiableList(descriptors);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
@ -156,16 +163,16 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final ComponentLog logger = getLogger();
final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles.isEmpty()) {
return;
}
final ComponentLog logger = getLogger();
// Keep track of the list of flow files that need to be transferred. As they are transferred, remove them from the list.
List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
try {
@ -178,6 +185,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(file).getValue();
final String indexOp = context.getProperty(INDEX_OP).evaluateAttributeExpressions(file).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue());
final String id = file.getAttribute(id_attribute);
if (id == null) {

View File

@ -104,8 +104,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.addValidator(NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@ -114,8 +113,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.description("The type of the operation used to index (index, update, upsert, delete)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(
AttributeExpression.ResultType.STRING, true))
.addValidator(NON_EMPTY_EL_VALIDATOR)
.defaultValue("index")
.build();
@ -128,19 +126,19 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("100")
.expressionLanguageSupported(true)
.build();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_RETRY);
return Collections.unmodifiableSet(relationships);
}
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
_rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels);
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@ -154,7 +152,18 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
descriptors.add(CHARSET);
descriptors.add(BATCH_SIZE);
descriptors.add(INDEX_OP);
return Collections.unmodifiableList(descriptors);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
@ -192,7 +201,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
final List<FlowFile> flowFiles = session.get(batchSize);
if (flowFiles.isEmpty()) {
@ -200,10 +209,10 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
final String id_attribute = context.getProperty(ID_ATTRIBUTE).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue());
// Authentication
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
OkHttpClient okHttpClient = getClient();
@ -213,7 +222,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
List<FlowFile> flowFilesToTransfer = new LinkedList<>(flowFiles);
final StringBuilder sb = new StringBuilder();
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).getValue());
final String baseUrl = trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
final URL url;
try {
url = new URL((baseUrl.endsWith("/") ? baseUrl : baseUrl + "/") + "_bulk");
@ -225,6 +234,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
for (FlowFile file : flowFiles) {
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(file).getValue();
final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(file).getValue());
if (StringUtils.isEmpty(index)) {
logger.error("No value for index in for {}, transferring to failure", new Object[]{id_attribute, file});
flowFilesToTransfer.remove(file);
@ -368,6 +378,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
logger.warn("Elasticsearch returned code {} with message {}, transferring flow file to failure", new Object[]{statusCode, getResponse.message()});
session.transfer(flowFilesToTransfer, REL_FAILURE);
}
getResponse.close();
}
}
}

View File

@ -174,17 +174,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.allowableValues(TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_RETRY);
return Collections.unmodifiableSet(relationships);
}
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
_rels.add(REL_RETRY);
relationships = Collections.unmodifiableSet(_rels);
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@ -201,7 +200,17 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
descriptors.add(LIMIT);
descriptors.add(TARGET);
return Collections.unmodifiableList(descriptors);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
@ -247,8 +256,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.equals(TARGET_FLOW_FILE_CONTENT);
// Authentication
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
final ComponentLog logger = getLogger();
@ -261,7 +270,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final long startNanos = System.nanoTime();
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).getValue());
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
boolean hitLimit = false;
do {
@ -279,6 +288,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
numResults = this.getPage(getResponse, queryUrl, context, session, flowFile,
logger, startNanos, targetIsContent);
fromIndex += pageSize;
getResponse.close();
} while (numResults > 0 && !hitLimit);
if (flowFile != null) {

View File

@ -159,16 +159,15 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
.required(true).expressionLanguageSupported(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
return Collections.unmodifiableSet(relationships);
}
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_rels);
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(ES_URL);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
@ -184,7 +183,17 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
descriptors.add(FIELDS);
descriptors.add(SORT);
return Collections.unmodifiableList(descriptors);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
@ -227,18 +236,18 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
.getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
// Authentication
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
final ComponentLog logger = getLogger();
try {
String scrollId = loadScrollId(context.getStateManager());
if (scrollId != null) {
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions()
.getValue());
if (scrollId != null) {
final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
scrollId, pageSize, scroll);
final long startNanos = System.nanoTime();
@ -246,13 +255,12 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
final Response getResponse = sendRequestToElasticsearch(okHttpClient, scrollurl,
username, password, "GET", null);
this.getPage(getResponse, scrollurl, context, session, flowFile, logger, startNanos);
getResponse.close();
} else {
logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[] { index,
docType, query });
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL)
.getValue());
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
scrollId, pageSize, scroll);
final long startNanos = System.nanoTime();
@ -260,6 +268,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
this.getPage(getResponse, queryUrl, context, session, flowFile, logger, startNanos);
getResponse.close();
}
} catch (IOException ioe) {

View File

@ -102,6 +102,37 @@ public class TestFetchElasticsearch {
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testFetchElasticsearchOnTriggerEL() throws IOException {
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(true)); // all docs are found
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}");
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
runner.setProperty(FetchElasticsearch.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(FetchElasticsearch.TYPE, "status");
runner.assertNotValid();
runner.setProperty(FetchElasticsearch.DOC_ID, "${doc_id}");
runner.assertValid();
runner.setVariable("cluster.name", "elasticsearch");
runner.setVariable("hosts", "127.0.0.1:9300");
runner.setVariable("ping.timeout", "5s");
runner.setVariable("sampler.interval", "5s");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearch.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testFetchElasticsearchOnTriggerWithFailures() throws IOException {
runner = TestRunners.newTestRunner(new FetchElasticsearchTestProcessor(false)); // simulate doc not found

View File

@ -61,6 +61,35 @@ public class TestFetchElasticsearchHttp {
runner = null;
}
@Test
public void testFetchElasticsearchOnTriggerEL() throws IOException {
runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
runner.setVariable("es.url", "http://127.0.0.1:9200");
runner.setVariable("connect.timeout", "5s");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testFetchElasticsearchOnTrigger() throws IOException {
runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(true)); // all docs are found

View File

@ -102,6 +102,39 @@ public class TestPutElasticsearch {
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerEL() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchTransportClientProcessor.CLUSTER_NAME, "${cluster.name}");
runner.setProperty(AbstractElasticsearchTransportClientProcessor.HOSTS, "${hosts}");
runner.setProperty(AbstractElasticsearchTransportClientProcessor.PING_TIMEOUT, "${ping.timeout}");
runner.setProperty(AbstractElasticsearchTransportClientProcessor.SAMPLER_INTERVAL, "${sampler.interval}");
runner.setProperty(PutElasticsearch.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(PutElasticsearch.TYPE, "status");
runner.setProperty(PutElasticsearch.BATCH_SIZE, "1");
runner.assertNotValid();
runner.setProperty(PutElasticsearch.ID_ATTRIBUTE, "doc_id");
runner.assertValid();
runner.setVariable("cluster.name", "elasticsearch");
runner.setVariable("hosts", "127.0.0.1:9300");
runner.setVariable("ping.timeout", "5s");
runner.setVariable("sampler.interval", "5s");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearch.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearch.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerWithFailures() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(true)); // simulate failures

View File

@ -126,6 +126,33 @@ public class TestPutElasticsearchHttp {
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerEL() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
runner.setVariable("es.url", "http://127.0.0.1:9200");
runner.setVariable("connect.timeout", "5s");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(PutElasticsearchHttp.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(PutElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testPutElasticSearchOnTriggerBadIndexOp() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures

View File

@ -76,6 +76,29 @@ public class TestQueryElasticsearchHttp {
runAndVerifySuccess(true);
}
@Test
public void testQueryElasticsearchOnTrigger_withInput_EL() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
runner.setVariable("es.url", "http://127.0.0.1:9200");
runAndVerifySuccess(true);
}
@Test
public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());

View File

@ -78,6 +78,30 @@ public class TestScrollElasticsearchHttp {
runAndVerifySuccess();
}
@Test
public void testScrollElasticsearchOnTrigger_withNoInput_EL() throws IOException {
runner = TestRunners.newTestRunner(new ScrollElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "${es.url}");
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.QUERY,
"source:WZ AND identifier:\"${identifier}\"");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
runner.setVariable("es.url", "http://127.0.0.1:9200");
runner.setIncomingConnection(false);
runAndVerifySuccess();
}
private void runAndVerifySuccess() {
runner.enqueue("".getBytes(), new HashMap<String, String>() {
{