NIFI-6403 and NIFI-6404: Elasticsearch 7 support

Addressed PR#4153 comments; removed ES Version property and made Type optional in all ES HTTP/Record processors, applying sensible default values where required; use _source queyr parameter instead of _source_include/s as it's compatible between ES versions

Fix unit test compilation to use JDK8-compatible library/method

Better optional type and id handling for PutElasticsearchRecord; update nifi-elasticsearch-client-service build dependencies to use latest versions of Elasticsearch in each supported major version (5/6/7); addressed several warnings in ElasticSearchClientServiceImpl

This closes #4667.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Chris Sampson 2020-11-16 13:03:50 +00:00 committed by Koji Kawamura
parent a3d845a38f
commit 124cdbd3fe
No known key found for this signature in database
GPG Key ID: 36136B0EC89E4758
17 changed files with 225 additions and 321 deletions

View File

@ -26,7 +26,7 @@
<packaging>jar</packaging>
<properties>
<es.int.version>5.6.15</es.int.version>
<es.int.version>5.6.16</es.int.version>
<script.name>setup-5.script</script.name>
<type.name>faketype</type.name>
</properties>
@ -71,7 +71,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
<version>2.8.0</version>
</dependency>
<dependency>
@ -82,7 +82,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
<version>3.11</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
@ -146,7 +146,7 @@
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -176,7 +176,7 @@
<profile>
<id>integration-6</id>
<properties>
<es.int.version>6.7.1</es.int.version>
<es.int.version>6.8.13</es.int.version>
<type.name>_doc</type.name>
<script.name>setup-6.script</script.name>
</properties>
@ -184,7 +184,7 @@
<profile>
<id>integration-7</id>
<properties>
<es.int.version>7.0.0</es.int.version>
<es.int.version>7.10.0</es.int.version>
<script.name>setup-7.script</script.name>
<type.name>_doc</type.name>
</properties>
@ -196,7 +196,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M3</version>
<version>3.0.0-M5</version>
<configuration>
<systemPropertyVariables>
<type_name>${type.name}</type_name>
@ -206,7 +206,7 @@
<plugin>
<groupId>com.github.alexcojocaru</groupId>
<artifactId>elasticsearch-maven-plugin</artifactId>
<version>6.13</version>
<version>6.19</version>
<configuration>
<clusterName>testCluster</clusterName>
<transportPort>9500</transportPort>

View File

@ -26,7 +26,6 @@ import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -61,25 +60,25 @@ import org.elasticsearch.client.RestClientBuilder;
public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
private final ObjectMapper mapper = new ObjectMapper();
static final private List<PropertyDescriptor> properties;
private static final List<PropertyDescriptor> properties;
private RestClient client;
private String url;
private Charset charset;
private Charset responseCharset;
static {
List<PropertyDescriptor> _props = new ArrayList();
_props.add(ElasticSearchClientService.HTTP_HOSTS);
_props.add(ElasticSearchClientService.USERNAME);
_props.add(ElasticSearchClientService.PASSWORD);
_props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
_props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
_props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
_props.add(ElasticSearchClientService.RETRY_TIMEOUT);
_props.add(ElasticSearchClientService.CHARSET);
List<PropertyDescriptor> props = new ArrayList<>();
props.add(ElasticSearchClientService.HTTP_HOSTS);
props.add(ElasticSearchClientService.USERNAME);
props.add(ElasticSearchClientService.PASSWORD);
props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
props.add(ElasticSearchClientService.RETRY_TIMEOUT);
props.add(ElasticSearchClientService.CHARSET);
properties = Collections.unmodifiableList(_props);
properties = Collections.unmodifiableList(props);
}
@Override
@ -91,7 +90,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
setupClient(context);
charset = Charset.forName(context.getProperty(CHARSET).getValue());
responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
} catch (Exception ex) {
getLogger().error("Could not initialize ElasticSearch client.", ex);
throw new InitializationException(ex);
@ -135,14 +134,14 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
RestClientBuilder builder = RestClient.builder(hh)
.setHttpClientConfigCallback(httpClientBuilder -> {
if (sslContext != null) {
httpClientBuilder = httpClientBuilder.setSSLContext(sslContext);
httpClientBuilder.setSSLContext(sslContext);
}
if (username != null && password != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
httpClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
return httpClientBuilder;
@ -159,11 +158,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private Response runQuery(String endpoint, String query, String index, String type) {
StringBuilder sb = new StringBuilder()
.append("/")
.append(index);
.append("/").append(index);
if (type != null && !type.equals("")) {
sb.append("/")
.append(type);
sb.append("/").append(type);
}
sb.append(String.format("/%s", endpoint));
@ -181,11 +178,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final int code = response.getStatusLine().getStatusCode();
try {
if (code >= 200 & code < 300) {
if (code >= 200 && code < 300) {
InputStream inputStream = response.getEntity().getContent();
byte[] result = IOUtils.toByteArray(inputStream);
inputStream.close();
return mapper.readValue(new String(result, charset), Map.class);
return (Map<String, Object>) mapper.readValue(new String(result, responseCharset), Map.class);
} else {
String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
response.getStatusLine().getReasonPhrase());
@ -198,7 +195,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
@Override
public IndexOperationResponse add(IndexOperationRequest operation) {
return bulk(Arrays.asList(operation));
return bulk(Collections.singletonList(operation));
}
private String flatten(String str) {
@ -216,8 +213,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
Map<String, Object> header = new HashMap<String, Object>() {{
put(operation, new HashMap<String, Object>() {{
put("_index", index);
if (StringUtils.isNotBlank(id)) {
put("_id", id);
}
if (StringUtils.isNotBlank(type)) {
put("_type", type);
}
}});
}};
@ -256,8 +257,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
try {
StringBuilder payload = new StringBuilder();
for (int index = 0; index < operations.size(); index++) {
IndexOperationRequest or = operations.get(index);
for (final IndexOperationRequest or : operations) {
buildRequest(or, payload);
}
@ -276,9 +276,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
getLogger().debug(String.format("Response was: %s", rawResponse));
}
IndexOperationResponse retVal = IndexOperationResponse.fromJsonResponse(rawResponse);
return retVal;
return IndexOperationResponse.fromJsonResponse(rawResponse);
} catch (Exception ex) {
throw new ElasticsearchError(ex);
}
@ -294,15 +292,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
@Override
public DeleteOperationResponse deleteById(String index, String type, String id) {
return deleteById(index, type, Arrays.asList(id));
return deleteById(index, type, Collections.singletonList(id));
}
@Override
public DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
try {
StringBuilder sb = new StringBuilder();
for (int idx = 0; idx < ids.size(); idx++) {
String header = buildBulkHeader("delete", index, type, ids.get(idx));
for (final String id : ids) {
String header = buildBulkHeader("delete", index, type, id);
sb.append(header).append("\n");
}
HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
@ -316,9 +314,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8)));
}
DeleteOperationResponse dor = new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
return dor;
return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@ -329,7 +325,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
long start = System.currentTimeMillis();
Response response = runQuery("_delete_by_query", query, index, type);
long end = System.currentTimeMillis();
Map<String, Object> parsed = parseResponse(response);
// check for errors in response
parseResponse(response);
return new DeleteOperationResponse(end - start);
}
@ -359,9 +357,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
*/
private int handleSearchCount(Object raw) {
if (raw instanceof Number) {
return Integer.valueOf(raw.toString());
return Integer.parseInt(raw.toString());
} else if (raw instanceof Map) {
return (Integer)((Map)raw).get("value");
return (Integer)((Map<String, Object>)raw).get("value");
} else {
throw new ProcessException("Unknown type for hit count.");
}
@ -402,10 +400,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public String getTransitUrl(String index, String type) {
return new StringBuilder()
.append(this.url)
.append(index != null && !index.equals("") ? "/" : "")
.append(index != null ? index : "")
.append(type != null && !type.equals("") ? "/" : "")
.append(type != null ? type : "")
.append(StringUtils.isNotBlank(index) ? "/" : "")
.append(StringUtils.isNotBlank(index) ? index : "")
.append(StringUtils.isNotBlank(type) ? "/" : "")
.append(StringUtils.isNotBlank(type) ? type : "")
.toString();
}
}

View File

@ -36,11 +36,9 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.Route;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
@ -57,13 +55,7 @@ import org.apache.nifi.util.StringUtils;
* A base class for Elasticsearch processors that use the HTTP API
*/
public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
enum ElasticsearchVersion {
ES_7,
ES_LESS_THAN_7
}
static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes";
static final String SOURCE_QUERY_PARAM = "_source";
static final String QUERY_QUERY_PARAM = "q";
static final String SORT_QUERY_PARAM = "sort";
static final String SIZE_QUERY_PARAM = "size";
@ -134,18 +126,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder()
.name("elasticsearch-http-version")
.displayName("Elasticsearch Version")
.description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).")
.required(true)
.allowableValues(
new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"),
new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x"))
.defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name())
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
@Override
@ -167,7 +147,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
static {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ES_URL);
properties.add(ES_VERSION);
properties.add(PROP_SSL_CONTEXT_SERVICE);
properties.add(CHARSET);
properties.add(USERNAME);
@ -308,12 +287,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
sb.append("\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\"");
if (!(StringUtils.isEmpty(docType) | docType == null)){
if (StringUtils.isNotBlank(docType)) {
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
}
if (!StringUtils.isEmpty(id)) {
if (StringUtils.isNotBlank(id)) {
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\"");
@ -325,7 +304,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
sb.append("{\"update\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\"");
if (!(StringUtils.isEmpty(docType) | docType == null)){
if (StringUtils.isNotBlank(docType)) {
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
@ -342,7 +321,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
sb.append("{\"delete\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\"");
if (!(StringUtils.isEmpty(docType) | docType == null)){
if (StringUtils.isNotBlank(docType)) {
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
@ -352,37 +331,4 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
sb.append("\" } }\n");
}
}
protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) {
return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7)
? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7;
}
static class ElasticsearchTypeValidator implements Validator {
private final boolean pre7TypeRequired;
/**
* Creates a validator for an ES type
* @param pre7TypeRequired If true, 'type' will be required for ES
* before version 7.0.
*/
public ElasticsearchTypeValidator(boolean pre7TypeRequired) {
this.pre7TypeRequired = pre7TypeRequired;
}
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context
.getProperty(ES_VERSION).getValue());
if (esVersion == ElasticsearchVersion.ES_7) {
return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input))
.explanation("Elasticsearch no longer supports 'type' as of version 7.0. Please use '_doc' or leave blank.")
.build();
} else {
return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input))
.explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.")
.build();
}
}
}
}

View File

@ -40,7 +40,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -121,11 +120,12 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("fetch-es-type")
.displayName("Type")
.description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
.description("The type of document/fetch (if unset, the first document matching the "
+ "identifier across _all types will be retrieved). "
+ "This should be unset, '_doc' or '_source' for Elasticsearch 7.0+.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new ElasticsearchTypeValidator(false))
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@ -150,7 +150,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_VERSION);
descriptors.add(DOC_ID);
descriptors.add(INDEX);
descriptors.add(TYPE);
@ -201,8 +200,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final String fields = context.getProperty(FIELDS).isSet()
? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
: null;
final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
.getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@ -218,7 +215,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion);
final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
final long startNanos = System.nanoTime();
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
@ -310,18 +307,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
}
private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
builder.addPathSegment(index);
builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
builder.addPathSegment(StringUtils.isBlank(type) ? "_all" : type);
builder.addPathSegment(docId);
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
}
// Find the user-added properties and set them as query parameters on the URL

View File

@ -112,10 +112,10 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.name("put-es-type")
.displayName("Type")
.description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
+ "This must be unset or '_doc' for Elasticsearch 7.0+.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new ElasticsearchTypeValidator(true))
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@ -153,7 +153,6 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_VERSION);
descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);

View File

@ -174,10 +174,10 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
.name("put-es-record-type")
.displayName("Type")
.description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
+ "This must be unset or '_doc' for Elasticsearch 7.0+.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new ElasticsearchTypeValidator(true))
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@ -261,7 +261,6 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_VERSION);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(LOG_ALL_ERRORS);

View File

@ -145,7 +145,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("query-es-index")
.displayName("Index")
.description("The name of the index to read from. If the property is set "
.description("The name of the index to read from. If the property is unset or set "
+ "to _all, the query will match across all indexes.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@ -155,11 +155,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("query-es-type")
.displayName("Type")
.description("The type of this document (if empty, searches across all types). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
.description("The type of document (if unset, the query will be against all types in the _index). "
+ "This should be unset or '_doc' for Elasticsearch 7.0+.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new ElasticsearchTypeValidator(false))
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@ -235,7 +235,6 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
static {
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(QUERY);
descriptors.add(ES_VERSION);
descriptors.add(PAGE_SIZE);
descriptors.add(INDEX);
descriptors.add(TYPE);
@ -307,17 +306,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
.getValue();
final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
.asInteger().intValue();
.asInteger();
final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT)
.evaluateAttributeExpressions(flowFile).asInteger().intValue() : null;
.evaluateAttributeExpressions(flowFile).asInteger() : null;
final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final boolean targetIsContent = context.getProperty(TARGET).getValue()
.equals(TARGET_FLOW_FILE_CONTENT);
final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
.getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
@ -345,7 +343,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
mPageSize, fromIndex, context, esVersion);
mPageSize, fromIndex, context);
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
@ -506,13 +504,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
if (!StringUtils.isEmpty(type)) {
if (StringUtils.isNotBlank(type)) {
builder.addPathSegment(type);
}
builder.addPathSegment("_search");
@ -521,8 +519,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));

View File

@ -46,8 +46,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchTypeValidator;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@ -137,11 +135,11 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("scroll-es-type")
.displayName("Type")
.description("The type of this document (if empty, searches across all types). "
+ "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
.required(true)
.description("The type of document (if unset, the query will be against all types in the _index). "
+ "This should be unset or '_doc' for Elasticsearch 7.0+.")
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new ElasticsearchTypeValidator(false))
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@ -186,7 +184,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(ES_VERSION);
descriptors.add(QUERY);
descriptors.add(SCROLL_DURATION);
descriptors.add(PAGE_SIZE);
@ -239,15 +236,13 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
.getValue();
final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
.asInteger().intValue();
.asInteger();
final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
.getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
.getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
@ -264,7 +259,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
.getValue());
if (scrollId != null) {
final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
scrollId, pageSize, scroll, context, esVersion);
scrollId, pageSize, scroll, context);
final long startNanos = System.nanoTime();
final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll,
@ -282,7 +277,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
// read the url property from the context
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
scrollId, pageSize, scroll, context, esVersion);
scrollId, pageSize, scroll, context);
final long startNanos = System.nanoTime();
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
@ -419,7 +414,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
String sort, String scrollId, int pageSize, String scroll, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
@ -429,7 +424,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
builder.addPathSegment("scroll");
} else {
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
if (!StringUtils.isEmpty(type)) {
if (StringUtils.isNotBlank(type)) {
builder.addPathSegment(type);
}
builder.addPathSegment("_search");
@ -437,8 +432,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -43,7 +42,6 @@ public class ITQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://localhost.internal:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
@ -70,7 +68,6 @@ public class ITQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://localhost.internal:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();

View File

@ -20,7 +20,6 @@ import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -41,7 +40,6 @@ public class ITScrollElasticsearchHttp {
runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://ip-172-31-49-152.ec2.internal:9200");
runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();

View File

@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
@ -63,7 +62,6 @@ public class PutElasticsearchHttpRecordIT {
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test");
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person");
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
FETCH_RUNNER.assertValid();
}
@ -77,7 +75,6 @@ public class PutElasticsearchHttpRecordIT {
runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test");
@ -213,7 +210,6 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);
@ -237,7 +233,6 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);
@ -261,7 +256,6 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap<String,Object>() {{
put("name", "John Doe");
put("age", 48);

View File

@ -26,11 +26,12 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -75,17 +76,16 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.setProperty(FetchElasticsearchHttp.TYPE, "");
runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.assertValid();
runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(FetchElasticsearchHttp.TYPE, "");
runner.assertValid(); // Valid because type is not required prior to 7.0
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
runner.assertValid();
runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
runner.setProperty(FetchElasticsearchHttp.TYPE, "${type}");
runner.assertValid();
runner.setProperty(FetchElasticsearchHttp.TYPE, "_doc");
runner.assertValid(); // Valid because type is not required prior to 7.0
runner.assertValid(); // Valid because type can be _doc for 7.0+
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
@ -135,7 +135,7 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL);
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.setProperty(FetchElasticsearchHttp.TYPE, "");
runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
@ -184,7 +184,7 @@ public class TestFetchElasticsearchHttp {
runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.setProperty(FetchElasticsearchHttp.TYPE, "");
runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.setIncomingConnection(true);
@ -284,7 +284,7 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.setProperty(FetchElasticsearchHttp.TYPE, "");
runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
// Allow time for the controller service to fully initialize
@ -300,7 +300,7 @@ public class TestFetchElasticsearchHttp {
@Test
public void testFetchElasticsearchOnTriggerQueryParameter() throws IOException {
FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source_include=id&myparam=myvalue");
p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source=id&myparam=myvalue");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
@ -323,6 +323,32 @@ public class TestFetchElasticsearchHttp {
out.assertAttributeEquals("doc_id", "28039652140");
}
@Test
public void testFetchElasticsearchOnTriggerQueryParameterNoType() throws IOException {
FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
p.setExpectedUrl("http://127.0.0.1:9200/doc/_all/28039652140?_source=id&myparam=myvalue");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.setProperty(FetchElasticsearchHttp.FIELDS, "id");
// Set dynamic property, to be added to the URL as a query parameter
runner.setProperty("myparam", "myvalue");
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", "28039652140");
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
out.assertAttributeEquals("doc_id", "28039652140");
}
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
@ -412,15 +438,25 @@ public class TestFetchElasticsearchHttp {
*/
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testFetchElasticsearchBasic() {
public void testFetchElasticsearchBasic() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
// add data to ES instance
new OkHttpClient.Builder().build().newCall(
new Request.Builder().url("http://127.0.0.1:9200/doc/_doc/28039652140")
.addHeader("Content-Type", "application/json")
.put(
RequestBody.create(MediaType.get("application/json"),
IOUtils.toString(docExample, StandardCharsets.UTF_8))
).build()
).execute();
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
@ -433,31 +469,6 @@ public class TestFetchElasticsearchHttp {
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
}
@Test
@Ignore("Comment this out if you want to run against local or test ES")
public void testFetchElasticsearchBatch() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
for (int i = 0; i < 100; i++) {
long newId = 28039652140L + i;
final String newStrId = Long.toString(newId);
runner.enqueue(docExample, new HashMap<String, String>() {{
put("doc_id", newStrId);
}});
}
runner.run(100);
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100);
}
@Test
@Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.")
public void testFetchElasticsearchBasicBehindProxy() {

View File

@ -30,7 +30,6 @@ import java.util.HashMap;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -112,10 +111,9 @@ public class TestPutElasticsearchHttp {
public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.TYPE, "");
runner.removeProperty(PutElasticsearchHttp.TYPE);
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
@ -340,17 +338,16 @@ public class TestPutElasticsearchHttp {
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(PutElasticsearchHttp.TYPE, "");
runner.assertNotValid(); // Not valid because type is required prior to 7.0
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttp.TYPE, " ");
runner.assertNotValid(); // Not valid because type is required prior to 7.0
runner.assertValid();
runner.removeProperty(PutElasticsearchHttp.TYPE);
runner.assertNotValid(); // Not valid because type is required prior to 7.0
runner.assertValid();
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
runner.assertValid();
runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
runner.setProperty(PutElasticsearchHttp.TYPE, "${type}");
runner.assertValid();
runner.setProperty(PutElasticsearchHttp.TYPE, "_doc");
runner.assertValid();
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");

View File

@ -28,7 +28,6 @@ import okio.Buffer;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@ -197,10 +196,9 @@ public class TestPutElasticsearchHttpRecord {
runner = TestRunners.newTestRunner(processor); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
@ -248,10 +246,9 @@ public class TestPutElasticsearchHttpRecord {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
runner.enqueue(new byte[0], new HashMap<String, String>() {{
@ -291,10 +288,9 @@ public class TestPutElasticsearchHttpRecord {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
runner.enqueue(new byte[0], new HashMap<String, String>() {{

View File

@ -32,7 +32,6 @@ import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -65,7 +64,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withInput() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@ -85,7 +83,6 @@ public class TestQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@ -109,20 +106,15 @@ public class TestQueryElasticsearchHttp {
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
runner.setProperty(QueryElasticsearchHttp.TYPE, "");
runner.removeProperty(QueryElasticsearchHttp.TYPE);
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.TYPE, "");
runner.assertValid(); // Valid because type is not required prior to 7.0
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
runner.assertValid();
runner.removeProperty(QueryElasticsearchHttp.TYPE);
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "${type}");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
@ -138,7 +130,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@ -165,7 +156,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@ -211,7 +201,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTriggerWithFields() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@ -231,7 +220,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@ -254,7 +242,6 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(500, "Server error");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@ -280,7 +267,6 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@ -306,7 +292,6 @@ public class TestQueryElasticsearchHttp {
processor.setExceptionToThrow(new IOException("Error reading from disk"));
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@ -332,7 +317,6 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail", 2);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@ -359,7 +343,6 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail", 1);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@ -382,9 +365,8 @@ public class TestQueryElasticsearchHttp {
runner.enableControllerService(sslService);
runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "");
runner.removeProperty(QueryElasticsearchHttp.TYPE);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
// Allow time for the controller service to fully initialize
@ -421,7 +403,6 @@ public class TestQueryElasticsearchHttp {
runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228");
runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("doc_id", "28039652140");
@ -449,7 +430,6 @@ public class TestQueryElasticsearchHttp {
runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid");
runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme");
runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.enqueue("".getBytes(), new HashMap<String, String>() {{
put("doc_id", "28039652140");
@ -465,7 +445,6 @@ public class TestQueryElasticsearchHttp {
p.setExpectedParam("myparam=myvalue");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
@ -478,30 +457,15 @@ public class TestQueryElasticsearchHttp {
@Test
public void testQueryElasticsearchOnTrigger_sourceIncludes() throws IOException {
QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
p.setExpectedParam("_source=test");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
runAndVerifySuccess(true);
// Now test with ES 7.x
p = new QueryElasticsearchHttpTestProcessor();
p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "");
runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
runAndVerifySuccess(true);
}
/**

View File

@ -32,7 +32,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@ -83,30 +82,15 @@ public class TestScrollElasticsearchHttp {
@Test
public void testScrollElasticsearchOnTrigger_sourceIncludes() throws IOException {
ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor();
p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
p.setExpectedParam("_source=test");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
runAndVerifySuccess();
// Now test with ES 7.x
p = new ScrollElasticsearchHttpTestProcessor();
p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
runAndVerifySuccess();
}
@Test
@ -163,20 +147,18 @@ public class TestScrollElasticsearchHttp {
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
runner.assertValid(); // Valid because type is not required prior to 7.0
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
runner.assertValid();
runner.removeProperty(ScrollElasticsearchHttp.TYPE);
runner.assertNotValid();
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "${type}");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
runner.assertNotValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
@ -286,9 +268,8 @@ public class TestScrollElasticsearchHttp {
runner.enableControllerService(sslService);
runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
runner.removeProperty(ScrollElasticsearchHttp.TYPE);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.setIncomingConnection(false);

View File

@ -28,6 +28,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.record.MockRecordParser
import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.util.StringUtils
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.Assert
@ -206,6 +207,42 @@ class PutElasticsearchRecordTest {
"schema.name": "recordPathTest"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.clearTransferState()
flowFileContents = prettyPrint(toJson([
[ msg: "Hello" ],
[ id: null, type: null, msg: "Hello" ],
[ id: "rec-3", msg: "Hello" ],
[ id: "rec-4", msg: "Hello" ],
[ id: "rec-5", msg: "Hello" ],
[ id: "rec-6", type: "message", msg: "Hello" ]
]))
evalClosure = { List<IndexOperationRequest> items ->
def nullTypeCount = items.findAll { it.type == null }.size()
def messageTypeCount = items.findAll { it.type == "message" }.size()
def nullIdCount = items.findAll { it.id == null }.size()
def recIdCount = items.findAll { StringUtils.startsWith(it.id, "rec-") }.size()
Assert.assertEquals("null type", 5, nullTypeCount)
Assert.assertEquals("message type", 1, messageTypeCount)
Assert.assertEquals("null id", 2, nullIdCount)
Assert.assertEquals("rec- id", 4, recIdCount)
}
clientService.evalClosure = evalClosure
runner.removeProperty(PutElasticsearchRecord.TYPE)
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.clearTransferState()