diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
index 7d056091da..baf8f7a0ab 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml
@@ -26,7 +26,7 @@
jar
- 5.6.15
+ 5.6.16
setup-5.script
faketype
@@ -71,7 +71,7 @@
commons-io
commons-io
- 2.6
+ 2.8.0
@@ -82,7 +82,7 @@
org.apache.commons
commons-lang3
- 3.9
+ 3.11
org.slf4j
@@ -146,7 +146,7 @@
org.apache.httpcomponents
httpclient
- 4.5.10
+ 4.5.13
org.apache.nifi
@@ -176,7 +176,7 @@
integration-6
- 6.7.1
+ 6.8.13
_doc
setup-6.script
@@ -184,7 +184,7 @@
integration-7
- 7.0.0
+ 7.10.0
setup-7.script
_doc
@@ -196,7 +196,7 @@
org.apache.maven.plugins
maven-failsafe-plugin
- 3.0.0-M3
+ 3.0.0-M5
${type.name}
@@ -206,7 +206,7 @@
com.github.alexcojocaru
elasticsearch-maven-plugin
- 6.13
+ 6.19
testCluster
9500
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
index 3a686af239..bad70bafe2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java
@@ -26,7 +26,6 @@ import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -61,25 +60,25 @@ import org.elasticsearch.client.RestClientBuilder;
public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
private final ObjectMapper mapper = new ObjectMapper();
- static final private List properties;
+ private static final List properties;
private RestClient client;
private String url;
- private Charset charset;
+ private Charset responseCharset;
static {
- List _props = new ArrayList();
- _props.add(ElasticSearchClientService.HTTP_HOSTS);
- _props.add(ElasticSearchClientService.USERNAME);
- _props.add(ElasticSearchClientService.PASSWORD);
- _props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
- _props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
- _props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
- _props.add(ElasticSearchClientService.RETRY_TIMEOUT);
- _props.add(ElasticSearchClientService.CHARSET);
+ List props = new ArrayList<>();
+ props.add(ElasticSearchClientService.HTTP_HOSTS);
+ props.add(ElasticSearchClientService.USERNAME);
+ props.add(ElasticSearchClientService.PASSWORD);
+ props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE);
+ props.add(ElasticSearchClientService.CONNECT_TIMEOUT);
+ props.add(ElasticSearchClientService.SOCKET_TIMEOUT);
+ props.add(ElasticSearchClientService.RETRY_TIMEOUT);
+ props.add(ElasticSearchClientService.CHARSET);
- properties = Collections.unmodifiableList(_props);
+ properties = Collections.unmodifiableList(props);
}
@Override
@@ -91,7 +90,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public void onEnabled(final ConfigurationContext context) throws InitializationException {
try {
setupClient(context);
- charset = Charset.forName(context.getProperty(CHARSET).getValue());
+ responseCharset = Charset.forName(context.getProperty(CHARSET).getValue());
} catch (Exception ex) {
getLogger().error("Could not initialize ElasticSearch client.", ex);
throw new InitializationException(ex);
@@ -126,44 +125,42 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final SSLContext sslContext;
try {
sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
- ? sslService.createSSLContext(ClientAuth.NONE) : null;
+ ? sslService.createSSLContext(ClientAuth.NONE) : null;
} catch (Exception e) {
getLogger().error("Error building up SSL Context from the supplied configuration.", e);
throw new InitializationException(e);
}
RestClientBuilder builder = RestClient.builder(hh)
- .setHttpClientConfigCallback(httpClientBuilder -> {
- if (sslContext != null) {
- httpClientBuilder = httpClientBuilder.setSSLContext(sslContext);
- }
+ .setHttpClientConfigCallback(httpClientBuilder -> {
+ if (sslContext != null) {
+ httpClientBuilder.setSSLContext(sslContext);
+ }
- if (username != null && password != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials(username, password));
- httpClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
- }
+ if (username != null && password != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(username, password));
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ }
- return httpClientBuilder;
- })
- .setRequestConfigCallback(requestConfigBuilder -> {
- requestConfigBuilder.setConnectTimeout(connectTimeout);
- requestConfigBuilder.setSocketTimeout(readTimeout);
- return requestConfigBuilder;
- })
- .setMaxRetryTimeoutMillis(retryTimeout);
+ return httpClientBuilder;
+ })
+ .setRequestConfigCallback(requestConfigBuilder -> {
+ requestConfigBuilder.setConnectTimeout(connectTimeout);
+ requestConfigBuilder.setSocketTimeout(readTimeout);
+ return requestConfigBuilder;
+ })
+ .setMaxRetryTimeoutMillis(retryTimeout);
this.client = builder.build();
}
private Response runQuery(String endpoint, String query, String index, String type) {
StringBuilder sb = new StringBuilder()
- .append("/")
- .append(index);
+ .append("/").append(index);
if (type != null && !type.equals("")) {
- sb.append("/")
- .append(type);
+ sb.append("/").append(type);
}
sb.append(String.format("/%s", endpoint));
@@ -181,11 +178,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final int code = response.getStatusLine().getStatusCode();
try {
- if (code >= 200 & code < 300) {
+ if (code >= 200 && code < 300) {
InputStream inputStream = response.getEntity().getContent();
byte[] result = IOUtils.toByteArray(inputStream);
inputStream.close();
- return mapper.readValue(new String(result, charset), Map.class);
+ return (Map) mapper.readValue(new String(result, responseCharset), Map.class);
} else {
String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
response.getStatusLine().getReasonPhrase());
@@ -198,7 +195,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
@Override
public IndexOperationResponse add(IndexOperationRequest operation) {
- return bulk(Arrays.asList(operation));
+ return bulk(Collections.singletonList(operation));
}
private String flatten(String str) {
@@ -216,8 +213,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
Map header = new HashMap() {{
put(operation, new HashMap() {{
put("_index", index);
- put("_id", id);
- put("_type", type);
+ if (StringUtils.isNotBlank(id)) {
+ put("_id", id);
+ }
+ if (StringUtils.isNotBlank(type)) {
+ put("_type", type);
+ }
}});
}};
@@ -256,8 +257,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public IndexOperationResponse bulk(List operations) {
try {
StringBuilder payload = new StringBuilder();
- for (int index = 0; index < operations.size(); index++) {
- IndexOperationRequest or = operations.get(index);
+ for (final IndexOperationRequest or : operations) {
buildRequest(or, payload);
}
@@ -276,9 +276,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
getLogger().debug(String.format("Response was: %s", rawResponse));
}
- IndexOperationResponse retVal = IndexOperationResponse.fromJsonResponse(rawResponse);
-
- return retVal;
+ return IndexOperationResponse.fromJsonResponse(rawResponse);
} catch (Exception ex) {
throw new ElasticsearchError(ex);
}
@@ -294,15 +292,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
@Override
public DeleteOperationResponse deleteById(String index, String type, String id) {
- return deleteById(index, type, Arrays.asList(id));
+ return deleteById(index, type, Collections.singletonList(id));
}
@Override
public DeleteOperationResponse deleteById(String index, String type, List ids) {
try {
StringBuilder sb = new StringBuilder();
- for (int idx = 0; idx < ids.size(); idx++) {
- String header = buildBulkHeader("delete", index, type, ids.get(idx));
+ for (final String id : ids) {
+ String header = buildBulkHeader("delete", index, type, id);
sb.append(header).append("\n");
}
HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
@@ -316,9 +314,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8)));
}
- DeleteOperationResponse dor = new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
-
- return dor;
+ return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -329,7 +325,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
long start = System.currentTimeMillis();
Response response = runQuery("_delete_by_query", query, index, type);
long end = System.currentTimeMillis();
- Map parsed = parseResponse(response);
+
+ // check for errors in response
+ parseResponse(response);
return new DeleteOperationResponse(end - start);
}
@@ -359,9 +357,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
*/
private int handleSearchCount(Object raw) {
if (raw instanceof Number) {
- return Integer.valueOf(raw.toString());
+ return Integer.parseInt(raw.toString());
} else if (raw instanceof Map) {
- return (Integer)((Map)raw).get("value");
+ return (Integer)((Map)raw).get("value");
} else {
throw new ProcessException("Unknown type for hit count.");
}
@@ -401,11 +399,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
@Override
public String getTransitUrl(String index, String type) {
return new StringBuilder()
- .append(this.url)
- .append(index != null && !index.equals("") ? "/" : "")
- .append(index != null ? index : "")
- .append(type != null && !type.equals("") ? "/" : "")
- .append(type != null ? type : "")
- .toString();
+ .append(this.url)
+ .append(StringUtils.isNotBlank(index) ? "/" : "")
+ .append(StringUtils.isNotBlank(index) ? index : "")
+ .append(StringUtils.isNotBlank(type) ? "/" : "")
+ .append(StringUtils.isNotBlank(type) ? type : "")
+ .toString();
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
index 291d99b4df..0374eb7dca 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java
@@ -36,11 +36,9 @@ import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.Route;
import org.apache.commons.text.StringEscapeUtils;
-import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
@@ -57,13 +55,7 @@ import org.apache.nifi.util.StringUtils;
* A base class for Elasticsearch processors that use the HTTP API
*/
public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor {
- enum ElasticsearchVersion {
- ES_7,
- ES_LESS_THAN_7
- }
-
- static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include";
- static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes";
+ static final String SOURCE_QUERY_PARAM = "_source";
static final String QUERY_QUERY_PARAM = "q";
static final String SORT_QUERY_PARAM = "sort";
static final String SIZE_QUERY_PARAM = "size";
@@ -134,18 +126,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
- public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder()
- .name("elasticsearch-http-version")
- .displayName("Elasticsearch Version")
- .description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).")
- .required(true)
- .allowableValues(
- new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"),
- new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x"))
- .defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name())
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>();
@Override
@@ -167,7 +147,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
static {
final List properties = new ArrayList<>();
properties.add(ES_URL);
- properties.add(ES_VERSION);
properties.add(PROP_SSL_CONTEXT_SERVICE);
properties.add(CHARSET);
properties.add(USERNAME);
@@ -308,12 +287,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
sb.append("\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\"");
- if (!(StringUtils.isEmpty(docType) | docType == null)){
+ if (StringUtils.isNotBlank(docType)) {
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
}
- if (!StringUtils.isEmpty(id)) {
+ if (StringUtils.isNotBlank(id)) {
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
sb.append("\"");
@@ -325,7 +304,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
sb.append("{\"update\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\"");
- if (!(StringUtils.isEmpty(docType) | docType == null)){
+ if (StringUtils.isNotBlank(docType)) {
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
@@ -342,47 +321,14 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
sb.append("{\"delete\": { \"_index\": \"");
sb.append(StringEscapeUtils.escapeJson(index));
sb.append("\"");
- if (!(StringUtils.isEmpty(docType) | docType == null)){
+ if (StringUtils.isNotBlank(docType)) {
sb.append(", \"_type\": \"");
sb.append(StringEscapeUtils.escapeJson(docType));
sb.append("\"");
}
sb.append(", \"_id\": \"");
sb.append(StringEscapeUtils.escapeJson(id));
- sb.append("\" }}\n");
- }
- }
-
- protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) {
- return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7)
- ? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7;
- }
-
- static class ElasticsearchTypeValidator implements Validator {
- private final boolean pre7TypeRequired;
-
- /**
- * Creates a validator for an ES type
- * @param pre7TypeRequired If true, 'type' will be required for ES
- * before version 7.0.
- */
- public ElasticsearchTypeValidator(boolean pre7TypeRequired) {
- this.pre7TypeRequired = pre7TypeRequired;
- }
-
- @Override
- public ValidationResult validate(String subject, String input, ValidationContext context) {
- ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context
- .getProperty(ES_VERSION).getValue());
- if (esVersion == ElasticsearchVersion.ES_7) {
- return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input))
- .explanation("Elasticsearch no longer supports 'type' as of version 7.0. Please use '_doc' or leave blank.")
- .build();
- } else {
- return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input))
- .explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.")
- .build();
- }
+ sb.append("\" } }\n");
}
}
}
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
index 50cad68594..dea039c9df 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java
@@ -40,7 +40,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -121,11 +120,12 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("fetch-es-type")
.displayName("Type")
- .description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved). "
- + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
- .required(true)
+ .description("The type of document/fetch (if unset, the first document matching the "
+ + "identifier across _all types will be retrieved). "
+ + "This should be unset, '_doc' or '_source' for Elasticsearch 7.0+.")
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(new ElasticsearchTypeValidator(false))
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -150,7 +150,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
relationships = Collections.unmodifiableSet(_rels);
final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
- descriptors.add(ES_VERSION);
descriptors.add(DOC_ID);
descriptors.add(INDEX);
descriptors.add(TYPE);
@@ -201,8 +200,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final String fields = context.getProperty(FIELDS).isSet()
? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue()
: null;
- final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
- .getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -218,7 +215,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
// read the url property from the context
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
- final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion);
+ final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context);
final long startNanos = System.nanoTime();
getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null);
@@ -310,18 +307,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
}
- private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+ private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
builder.addPathSegment(index);
- builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type);
+ builder.addPathSegment(StringUtils.isBlank(type) ? "_all" : type);
builder.addPathSegment(docId);
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
- final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
- builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+ builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
}
// Find the user-added properties and set them as query parameters on the URL
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
index b427b8ed4d..8c501c3d2e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java
@@ -111,11 +111,11 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("put-es-type")
.displayName("Type")
- .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
- + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
- .required(true)
+ .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ + "This must be unset or '_doc' for Elasticsearch 7.0+.")
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(new ElasticsearchTypeValidator(true))
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -153,7 +153,6 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
relationships = Collections.unmodifiableSet(_rels);
final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
- descriptors.add(ES_VERSION);
descriptors.add(ID_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
index 130bde86e5..fd28b4907c 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java
@@ -173,11 +173,11 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("put-es-record-type")
.displayName("Type")
- .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
- + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
- .required(true)
+ .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). "
+ + "This must be unset or '_doc' for Elasticsearch 7.0+.")
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(new ElasticsearchTypeValidator(true))
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@@ -261,7 +261,6 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess
relationships = Collections.unmodifiableSet(_rels);
final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
- descriptors.add(ES_VERSION);
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(LOG_ALL_ERRORS);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
index 58bab07d9d..0a769d65e5 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java
@@ -145,7 +145,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("query-es-index")
.displayName("Index")
- .description("The name of the index to read from. If the property is set "
+ .description("The name of the index to read from. If the property is unset or set "
+ "to _all, the query will match across all indexes.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@@ -155,11 +155,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("query-es-type")
.displayName("Type")
- .description("The type of this document (if empty, searches across all types). "
- + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
- .required(true)
+ .description("The type of document (if unset, the query will be against all types in the _index). "
+ + "This should be unset or '_doc' for Elasticsearch 7.0+.")
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(new ElasticsearchTypeValidator(false))
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -235,7 +235,6 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
static {
final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
descriptors.add(QUERY);
- descriptors.add(ES_VERSION);
descriptors.add(PAGE_SIZE);
descriptors.add(INDEX);
descriptors.add(TYPE);
@@ -307,17 +306,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
.getValue();
final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
- .asInteger().intValue();
+ .asInteger();
final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT)
- .evaluateAttributeExpressions(flowFile).asInteger().intValue() : null;
+ .evaluateAttributeExpressions(flowFile).asInteger() : null;
final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final boolean targetIsContent = context.getProperty(TARGET).getValue()
.equals(TARGET_FLOW_FILE_CONTENT);
- final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
- .getValue());
+
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
@@ -345,7 +343,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
- mPageSize, fromIndex, context, esVersion);
+ mPageSize, fromIndex, context);
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
username, password, "GET", null);
@@ -506,13 +504,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
- String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+ String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder();
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
- if (!StringUtils.isEmpty(type)) {
+ if (StringUtils.isNotBlank(type)) {
builder.addPathSegment(type);
}
builder.addPathSegment("_search");
@@ -521,8 +519,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
- final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
- builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+ builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
index 322f82f1fc..33c75d74e2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java
@@ -46,8 +46,6 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchTypeValidator;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -137,11 +135,11 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("scroll-es-type")
.displayName("Type")
- .description("The type of this document (if empty, searches across all types). "
- + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.")
- .required(true)
+ .description("The type of document (if unset, the query will be against all types in the _index). "
+ + "This should be unset or '_doc' for Elasticsearch 7.0+.")
+ .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .addValidator(new ElasticsearchTypeValidator(false))
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder()
@@ -186,7 +184,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
relationships = Collections.unmodifiableSet(_rels);
final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS);
- descriptors.add(ES_VERSION);
descriptors.add(QUERY);
descriptors.add(SCROLL_DURATION);
descriptors.add(PAGE_SIZE);
@@ -239,15 +236,13 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile)
.getValue();
final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile)
- .asInteger().intValue();
+ .asInteger();
final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT)
.evaluateAttributeExpressions(flowFile).getValue() : null;
final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context
.getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null;
- final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION)
- .getValue());
// Authentication
final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
@@ -264,7 +259,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
.getValue());
if (scrollId != null) {
final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort,
- scrollId, pageSize, scroll, context, esVersion);
+ scrollId, pageSize, scroll, context);
final long startNanos = System.nanoTime();
final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll,
@@ -282,7 +277,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
// read the url property from the context
final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort,
- scrollId, pageSize, scroll, context, esVersion);
+ scrollId, pageSize, scroll, context);
final long startNanos = System.nanoTime();
final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl,
@@ -419,7 +414,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
}
private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields,
- String sort, String scrollId, int pageSize, String scroll, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException {
+ String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException {
if (StringUtils.isEmpty(baseUrl)) {
throw new MalformedURLException("Base URL cannot be null");
}
@@ -429,7 +424,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
builder.addPathSegment("scroll");
} else {
builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index);
- if (!StringUtils.isEmpty(type)) {
+ if (StringUtils.isNotBlank(type)) {
builder.addPathSegment(type);
}
builder.addPathSegment("_search");
@@ -437,8 +432,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor
builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize));
if (!StringUtils.isEmpty(fields)) {
String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(","));
- final String fieldIncludeParameter = getFieldIncludeParameter(esVersion);
- builder.addQueryParameter(fieldIncludeParameter, trimmedFields);
+ builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields);
}
if (!StringUtils.isEmpty(sort)) {
String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(","));
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
index e3ace2439f..f60e2f93c2 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -43,7 +42,6 @@ public class ITQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://localhost.internal:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
@@ -70,7 +68,6 @@ public class ITQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://localhost.internal:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
index b25e3e2722..57ee242ec7 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java
@@ -20,7 +20,6 @@ import static org.junit.Assert.assertNotNull;
import java.io.IOException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -41,7 +40,6 @@ public class ITScrollElasticsearchHttp {
runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL,
"http://ip-172-31-49-152.ec2.internal:9200");
- runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting");
runner.assertNotValid();
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
index c13bf406c5..9a4a63fdf5 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java
@@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
@@ -63,7 +62,6 @@ public class PutElasticsearchHttpRecordIT {
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test");
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person");
FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
- FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
FETCH_RUNNER.assertValid();
}
@@ -77,7 +75,6 @@ public class PutElasticsearchHttpRecordIT {
runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
- runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader");
runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200");
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test");
@@ -213,7 +210,6 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
- runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap() {{
put("name", "John Doe");
put("age", 48);
@@ -237,7 +233,6 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person");
- runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap() {{
put("name", "John Doe");
put("age", 48);
@@ -261,7 +256,6 @@ public class PutElasticsearchHttpRecordIT {
// Undo some stuff from setup()
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2");
runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son");
- runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
recordReader.addRecord(1, new MapRecord(personSchema, new HashMap() {{
put("name", "John Doe");
put("age", 48);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
index fb8b164b3a..babdb3b18e 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java
@@ -26,11 +26,12 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
+import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -75,17 +76,16 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
- runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+ runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.assertValid();
- runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(FetchElasticsearchHttp.TYPE, "");
- runner.assertValid(); // Valid because type is not required prior to 7.0
+ runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
runner.assertValid();
- runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
- runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+ runner.setProperty(FetchElasticsearchHttp.TYPE, "${type}");
+ runner.assertValid();
runner.setProperty(FetchElasticsearchHttp.TYPE, "_doc");
- runner.assertValid(); // Valid because type is not required prior to 7.0
+ runner.assertValid(); // Valid because type can be _doc for 7.0+
runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}");
runner.assertValid();
@@ -135,7 +135,7 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL);
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
- runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+ runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.assertNotValid();
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
@@ -184,7 +184,7 @@ public class TestFetchElasticsearchHttp {
runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
- runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+ runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.setIncomingConnection(true);
@@ -284,7 +284,7 @@ public class TestFetchElasticsearchHttp {
runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
- runner.setProperty(FetchElasticsearchHttp.TYPE, "");
+ runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
// Allow time for the controller service to fully initialize
@@ -300,7 +300,7 @@ public class TestFetchElasticsearchHttp {
@Test
public void testFetchElasticsearchOnTriggerQueryParameter() throws IOException {
FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
- p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source_include=id&myparam=myvalue");
+ p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source=id&myparam=myvalue");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
@@ -323,6 +323,32 @@ public class TestFetchElasticsearchHttp {
out.assertAttributeEquals("doc_id", "28039652140");
}
+ @Test
+ public void testFetchElasticsearchOnTriggerQueryParameterNoType() throws IOException {
+ FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found
+ p.setExpectedUrl("http://127.0.0.1:9200/doc/_all/28039652140?_source=id&myparam=myvalue");
+ runner = TestRunners.newTestRunner(p);
+ runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
+
+ runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
+ runner.removeProperty(FetchElasticsearchHttp.TYPE);
+ runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
+ runner.setProperty(FetchElasticsearchHttp.FIELDS, "id");
+
+ // Set dynamic property, to be added to the URL as a query parameter
+ runner.setProperty("myparam", "myvalue");
+
+ runner.enqueue(docExample, new HashMap() {{
+ put("doc_id", "28039652140");
+ }});
+ runner.run(1, true, true);
+
+ runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0);
+ assertNotNull(out);
+ out.assertAttributeEquals("doc_id", "28039652140");
+ }
+
/**
* A Test class that extends the processor in order to inject/mock behavior
*/
@@ -412,15 +438,25 @@ public class TestFetchElasticsearchHttp {
*/
@Test
@Ignore("Comment this out if you want to run against local or test ES")
- public void testFetchElasticsearchBasic() {
+ public void testFetchElasticsearchBasic() throws IOException {
System.out.println("Starting test " + new Object() {
}.getClass().getEnclosingMethod().getName());
final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
+ // add data to ES instance
+ new OkHttpClient.Builder().build().newCall(
+ new Request.Builder().url("http://127.0.0.1:9200/doc/_doc/28039652140")
+ .addHeader("Content-Type", "application/json")
+ .put(
+ RequestBody.create(MediaType.get("application/json"),
+ IOUtils.toString(docExample, StandardCharsets.UTF_8))
+ ).build()
+ ).execute();
+
//Local Cluster - Mac pulled from brew
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
- runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
+ runner.removeProperty(FetchElasticsearchHttp.TYPE);
runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
runner.assertValid();
@@ -433,31 +469,6 @@ public class TestFetchElasticsearchHttp {
runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1);
}
- @Test
- @Ignore("Comment this out if you want to run against local or test ES")
- public void testFetchElasticsearchBatch() throws IOException {
- System.out.println("Starting test " + new Object() {
- }.getClass().getEnclosingMethod().getName());
- final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp());
-
- //Local Cluster - Mac pulled from brew
- runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(FetchElasticsearchHttp.INDEX, "doc");
- runner.setProperty(FetchElasticsearchHttp.TYPE, "status");
- runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}");
- runner.assertValid();
-
- for (int i = 0; i < 100; i++) {
- long newId = 28039652140L + i;
- final String newStrId = Long.toString(newId);
- runner.enqueue(docExample, new HashMap() {{
- put("doc_id", newStrId);
- }});
- }
- runner.run(100);
- runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100);
- }
-
@Test
@Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.")
public void testFetchElasticsearchBasicBehindProxy() {
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
index 896b9810e8..a9371d13ba 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java
@@ -30,7 +30,6 @@ import java.util.HashMap;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -112,10 +111,9 @@ public class TestPutElasticsearchHttp {
public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException {
runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
- runner.setProperty(PutElasticsearchHttp.TYPE, "");
+ runner.removeProperty(PutElasticsearchHttp.TYPE);
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id");
@@ -340,17 +338,16 @@ public class TestPutElasticsearchHttp {
runner.assertNotValid();
runner.setProperty(PutElasticsearchHttp.INDEX, "doc");
- runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(PutElasticsearchHttp.TYPE, "");
- runner.assertNotValid(); // Not valid because type is required prior to 7.0
+ runner.assertNotValid();
runner.setProperty(PutElasticsearchHttp.TYPE, " ");
- runner.assertNotValid(); // Not valid because type is required prior to 7.0
+ runner.assertValid();
runner.removeProperty(PutElasticsearchHttp.TYPE);
- runner.assertNotValid(); // Not valid because type is required prior to 7.0
+ runner.assertValid();
runner.setProperty(PutElasticsearchHttp.TYPE, "status");
runner.assertValid();
- runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
- runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
+ runner.setProperty(PutElasticsearchHttp.TYPE, "${type}");
+ runner.assertValid();
runner.setProperty(PutElasticsearchHttp.TYPE, "_doc");
runner.assertValid();
runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1");
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
index a651aefdf8..bea99954ba 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java
@@ -28,7 +28,6 @@ import okio.Buffer;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@@ -197,10 +196,9 @@ public class TestPutElasticsearchHttpRecord {
runner = TestRunners.newTestRunner(processor); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
- runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+ runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy");
runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a");
@@ -248,10 +246,9 @@ public class TestPutElasticsearchHttpRecord {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
- runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+ runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update");
runner.enqueue(new byte[0], new HashMap() {{
@@ -291,10 +288,9 @@ public class TestPutElasticsearchHttpRecord {
runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures
generateTestData();
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc");
- runner.setProperty(PutElasticsearchHttpRecord.TYPE, "");
+ runner.removeProperty(PutElasticsearchHttpRecord.TYPE);
runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id");
runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE");
runner.enqueue(new byte[0], new HashMap() {{
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
index 0f6b6e44fd..4cdb035eca 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java
@@ -32,7 +32,6 @@ import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -65,7 +64,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withInput() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -85,7 +83,6 @@ public class TestQueryElasticsearchHttp {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -109,20 +106,15 @@ public class TestQueryElasticsearchHttp {
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
- runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+ runner.removeProperty(QueryElasticsearchHttp.TYPE);
runner.assertValid();
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
- runner.setProperty(QueryElasticsearchHttp.TYPE, "");
- runner.assertValid(); // Valid because type is not required prior to 7.0
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertValid();
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
- runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
- runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
- runner.assertValid();
- runner.removeProperty(QueryElasticsearchHttp.TYPE);
- runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+ runner.assertNotValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "${type}");
+ runner.assertValid();
+ runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
@@ -138,7 +130,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -165,7 +156,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -211,7 +201,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTriggerWithFields() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -231,7 +220,6 @@ public class TestQueryElasticsearchHttp {
public void testQueryElasticsearchOnTriggerWithLimit() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor());
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
@@ -254,7 +242,6 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(500, "Server error");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -280,7 +267,6 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail");
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -306,7 +292,6 @@ public class TestQueryElasticsearchHttp {
processor.setExceptionToThrow(new IOException("Error reading from disk"));
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -332,7 +317,6 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail", 2);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -359,7 +343,6 @@ public class TestQueryElasticsearchHttp {
processor.setStatus(100, "Should fail", 1);
runner = TestRunners.newTestRunner(processor); // simulate doc not found
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
@@ -382,9 +365,8 @@ public class TestQueryElasticsearchHttp {
runner.enableControllerService(sslService);
runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
- runner.setProperty(QueryElasticsearchHttp.TYPE, "");
+ runner.removeProperty(QueryElasticsearchHttp.TYPE);
runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}");
// Allow time for the controller service to fully initialize
@@ -421,7 +403,6 @@ public class TestQueryElasticsearchHttp {
runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost");
runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228");
runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.enqueue("".getBytes(), new HashMap() {{
put("doc_id", "28039652140");
@@ -449,7 +430,6 @@ public class TestQueryElasticsearchHttp {
runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid");
runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme");
runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.enqueue("".getBytes(), new HashMap() {{
put("doc_id", "28039652140");
@@ -465,7 +445,6 @@ public class TestQueryElasticsearchHttp {
p.setExpectedParam("myparam=myvalue");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
@@ -478,30 +457,15 @@ public class TestQueryElasticsearchHttp {
@Test
public void testQueryElasticsearchOnTrigger_sourceIncludes() throws IOException {
QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor();
- p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+ p.setExpectedParam("_source=test");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
runAndVerifySuccess(true);
-
- // Now test with ES 7.x
-
- p = new QueryElasticsearchHttpTestProcessor();
- p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
- runner = TestRunners.newTestRunner(p);
- runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-
- runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
- runner.setProperty(QueryElasticsearchHttp.TYPE, "");
- runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter");
- runner.setProperty(QueryElasticsearchHttp.FIELDS, "test");
- runAndVerifySuccess(true);
}
/**
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
index 74ac031b72..f790256de8 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java
@@ -32,7 +32,6 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
@@ -83,30 +82,15 @@ public class TestScrollElasticsearchHttp {
@Test
public void testScrollElasticsearchOnTrigger_sourceIncludes() throws IOException {
ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor();
- p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param
+ p.setExpectedParam("_source=test");
runner = TestRunners.newTestRunner(p);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
runAndVerifySuccess();
-
- // Now test with ES 7.x
-
- p = new ScrollElasticsearchHttpTestProcessor();
- p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param
- runner = TestRunners.newTestRunner(p);
- runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
-
- runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
- runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
- runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter");
- runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test");
- runAndVerifySuccess();
}
@Test
@@ -163,20 +147,18 @@ public class TestScrollElasticsearchHttp {
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
- runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name());
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
- runner.assertNotValid();
- runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
- runner.assertValid(); // Valid because type is not required prior to 7.0
- runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
- runner.assertValid();
- runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
- runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+
- runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
runner.assertValid();
runner.removeProperty(ScrollElasticsearchHttp.TYPE);
- runner.assertNotValid();
+ runner.assertValid();
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "status");
+ runner.assertValid();
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "${type}");
+ runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+ runner.assertNotValid();
+ runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc");
+ runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location");
runner.assertValid();
runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc");
@@ -286,9 +268,8 @@ public class TestScrollElasticsearchHttp {
runner.enableControllerService(sslService);
runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context");
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
- runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name());
runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc");
- runner.setProperty(ScrollElasticsearchHttp.TYPE, "");
+ runner.removeProperty(ScrollElasticsearchHttp.TYPE);
runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}");
runner.setIncomingConnection(false);
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 214f15e54a..1eb2893134 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -28,6 +28,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.record.MockRecordParser
import org.apache.nifi.serialization.record.MockSchemaRegistry
+import org.apache.nifi.util.StringUtils
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.Assert
@@ -206,6 +207,42 @@ class PutElasticsearchRecordTest {
"schema.name": "recordPathTest"
])
runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
+
+ runner.clearTransferState()
+
+ flowFileContents = prettyPrint(toJson([
+ [ msg: "Hello" ],
+ [ id: null, type: null, msg: "Hello" ],
+ [ id: "rec-3", msg: "Hello" ],
+ [ id: "rec-4", msg: "Hello" ],
+ [ id: "rec-5", msg: "Hello" ],
+ [ id: "rec-6", type: "message", msg: "Hello" ]
+ ]))
+
+ evalClosure = { List items ->
+ def nullTypeCount = items.findAll { it.type == null }.size()
+ def messageTypeCount = items.findAll { it.type == "message" }.size()
+ def nullIdCount = items.findAll { it.id == null }.size()
+ def recIdCount = items.findAll { StringUtils.startsWith(it.id, "rec-") }.size()
+ Assert.assertEquals("null type", 5, nullTypeCount)
+ Assert.assertEquals("message type", 1, messageTypeCount)
+ Assert.assertEquals("null id", 2, nullIdCount)
+ Assert.assertEquals("rec- id", 4, recIdCount)
+ }
+
+ clientService.evalClosure = evalClosure
+
+ runner.removeProperty(PutElasticsearchRecord.TYPE)
+ runner.enqueue(flowFileContents, [
+ "schema.name": "recordPathTest"
+ ])
+ runner.run()
+ runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
+ runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.clearTransferState()