From 124cdbd3fe5446bff08346e5a28f2883f13e2848 Mon Sep 17 00:00:00 2001 From: Chris Sampson Date: Mon, 16 Nov 2020 13:03:50 +0000 Subject: [PATCH] NIFI-6403 and NIFI-6404: Elasticsearch 7 support Addressed PR#4153 comments; removed ES Version property and made Type optional in all ES HTTP/Record processors, applying sensible default values where required; use _source queyr parameter instead of _source_include/s as it's compatible between ES versions Fix unit test compilation to use JDK8-compatible library/method Better optional type and id handling for PutElasticsearchRecord; update nifi-elasticsearch-client-service build dependencies to use latest versions of Elasticsearch in each supported major version (5/6/7); addressed several warnings in ElasticSearchClientServiceImpl This closes #4667. Signed-off-by: Koji Kawamura --- .../nifi-elasticsearch-client-service/pom.xml | 16 +-- .../ElasticSearchClientServiceImpl.java | 122 +++++++++--------- .../AbstractElasticsearchHttpProcessor.java | 66 +--------- .../elasticsearch/FetchElasticsearchHttp.java | 22 ++-- .../elasticsearch/PutElasticsearchHttp.java | 9 +- .../PutElasticsearchHttpRecord.java | 9 +- .../elasticsearch/QueryElasticsearchHttp.java | 27 ++-- .../ScrollElasticsearchHttp.java | 26 ++-- .../ITQueryElasticsearchHttp.java | 3 - .../ITScrollElasticsearchHttp.java | 2 - .../PutElasticsearchHttpRecordIT.java | 6 - .../TestFetchElasticsearchHttp.java | 87 +++++++------ .../TestPutElasticsearchHttp.java | 15 +-- .../TestPutElasticsearchHttpRecord.java | 10 +- .../TestQueryElasticsearchHttp.java | 50 +------ .../TestScrollElasticsearchHttp.java | 39 ++---- .../PutElasticsearchRecordTest.groovy | 37 ++++++ 17 files changed, 225 insertions(+), 321 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml index 7d056091da..baf8f7a0ab 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml @@ -26,7 +26,7 @@ jar - 5.6.15 + 5.6.16 setup-5.script faketype @@ -71,7 +71,7 @@ commons-io commons-io - 2.6 + 2.8.0 @@ -82,7 +82,7 @@ org.apache.commons commons-lang3 - 3.9 + 3.11 org.slf4j @@ -146,7 +146,7 @@ org.apache.httpcomponents httpclient - 4.5.10 + 4.5.13 org.apache.nifi @@ -176,7 +176,7 @@ integration-6 - 6.7.1 + 6.8.13 _doc setup-6.script @@ -184,7 +184,7 @@ integration-7 - 7.0.0 + 7.10.0 setup-7.script _doc @@ -196,7 +196,7 @@ org.apache.maven.plugins maven-failsafe-plugin - 3.0.0-M3 + 3.0.0-M5 ${type.name} @@ -206,7 +206,7 @@ com.github.alexcojocaru elasticsearch-maven-plugin - 6.13 + 6.19 testCluster 9500 diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java index 3a686af239..bad70bafe2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java @@ -26,7 +26,6 @@ import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -61,25 +60,25 @@ import org.elasticsearch.client.RestClientBuilder; public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { private final ObjectMapper mapper = new ObjectMapper(); - static final private List properties; + private static final List properties; private RestClient client; private String url; - private Charset charset; + private Charset responseCharset; static { - List _props = new ArrayList(); - _props.add(ElasticSearchClientService.HTTP_HOSTS); - _props.add(ElasticSearchClientService.USERNAME); - _props.add(ElasticSearchClientService.PASSWORD); - _props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); - _props.add(ElasticSearchClientService.CONNECT_TIMEOUT); - _props.add(ElasticSearchClientService.SOCKET_TIMEOUT); - _props.add(ElasticSearchClientService.RETRY_TIMEOUT); - _props.add(ElasticSearchClientService.CHARSET); + List props = new ArrayList<>(); + props.add(ElasticSearchClientService.HTTP_HOSTS); + props.add(ElasticSearchClientService.USERNAME); + props.add(ElasticSearchClientService.PASSWORD); + props.add(ElasticSearchClientService.PROP_SSL_CONTEXT_SERVICE); + props.add(ElasticSearchClientService.CONNECT_TIMEOUT); + props.add(ElasticSearchClientService.SOCKET_TIMEOUT); + props.add(ElasticSearchClientService.RETRY_TIMEOUT); + props.add(ElasticSearchClientService.CHARSET); - properties = Collections.unmodifiableList(_props); + properties = Collections.unmodifiableList(props); } @Override @@ -91,7 +90,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im public void onEnabled(final ConfigurationContext context) throws InitializationException { try { setupClient(context); - charset = Charset.forName(context.getProperty(CHARSET).getValue()); + responseCharset = Charset.forName(context.getProperty(CHARSET).getValue()); } catch (Exception ex) { getLogger().error("Could not initialize ElasticSearch client.", ex); throw new InitializationException(ex); @@ -126,44 +125,42 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final SSLContext sslContext; try { sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured())) - ? sslService.createSSLContext(ClientAuth.NONE) : null; + ? sslService.createSSLContext(ClientAuth.NONE) : null; } catch (Exception e) { getLogger().error("Error building up SSL Context from the supplied configuration.", e); throw new InitializationException(e); } RestClientBuilder builder = RestClient.builder(hh) - .setHttpClientConfigCallback(httpClientBuilder -> { - if (sslContext != null) { - httpClientBuilder = httpClientBuilder.setSSLContext(sslContext); - } + .setHttpClientConfigCallback(httpClientBuilder -> { + if (sslContext != null) { + httpClientBuilder.setSSLContext(sslContext); + } - if (username != null && password != null) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(username, password)); - httpClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); - } + if (username != null && password != null) { + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(username, password)); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } - return httpClientBuilder; - }) - .setRequestConfigCallback(requestConfigBuilder -> { - requestConfigBuilder.setConnectTimeout(connectTimeout); - requestConfigBuilder.setSocketTimeout(readTimeout); - return requestConfigBuilder; - }) - .setMaxRetryTimeoutMillis(retryTimeout); + return httpClientBuilder; + }) + .setRequestConfigCallback(requestConfigBuilder -> { + requestConfigBuilder.setConnectTimeout(connectTimeout); + requestConfigBuilder.setSocketTimeout(readTimeout); + return requestConfigBuilder; + }) + .setMaxRetryTimeoutMillis(retryTimeout); this.client = builder.build(); } private Response runQuery(String endpoint, String query, String index, String type) { StringBuilder sb = new StringBuilder() - .append("/") - .append(index); + .append("/").append(index); if (type != null && !type.equals("")) { - sb.append("/") - .append(type); + sb.append("/").append(type); } sb.append(String.format("/%s", endpoint)); @@ -181,11 +178,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final int code = response.getStatusLine().getStatusCode(); try { - if (code >= 200 & code < 300) { + if (code >= 200 && code < 300) { InputStream inputStream = response.getEntity().getContent(); byte[] result = IOUtils.toByteArray(inputStream); inputStream.close(); - return mapper.readValue(new String(result, charset), Map.class); + return (Map) mapper.readValue(new String(result, responseCharset), Map.class); } else { String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s", response.getStatusLine().getReasonPhrase()); @@ -198,7 +195,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im @Override public IndexOperationResponse add(IndexOperationRequest operation) { - return bulk(Arrays.asList(operation)); + return bulk(Collections.singletonList(operation)); } private String flatten(String str) { @@ -216,8 +213,12 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im Map header = new HashMap() {{ put(operation, new HashMap() {{ put("_index", index); - put("_id", id); - put("_type", type); + if (StringUtils.isNotBlank(id)) { + put("_id", id); + } + if (StringUtils.isNotBlank(type)) { + put("_type", type); + } }}); }}; @@ -256,8 +257,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im public IndexOperationResponse bulk(List operations) { try { StringBuilder payload = new StringBuilder(); - for (int index = 0; index < operations.size(); index++) { - IndexOperationRequest or = operations.get(index); + for (final IndexOperationRequest or : operations) { buildRequest(or, payload); } @@ -276,9 +276,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im getLogger().debug(String.format("Response was: %s", rawResponse)); } - IndexOperationResponse retVal = IndexOperationResponse.fromJsonResponse(rawResponse); - - return retVal; + return IndexOperationResponse.fromJsonResponse(rawResponse); } catch (Exception ex) { throw new ElasticsearchError(ex); } @@ -294,15 +292,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im @Override public DeleteOperationResponse deleteById(String index, String type, String id) { - return deleteById(index, type, Arrays.asList(id)); + return deleteById(index, type, Collections.singletonList(id)); } @Override public DeleteOperationResponse deleteById(String index, String type, List ids) { try { StringBuilder sb = new StringBuilder(); - for (int idx = 0; idx < ids.size(); idx++) { - String header = buildBulkHeader("delete", index, type, ids.get(idx)); + for (final String id : ids) { + String header = buildBulkHeader("delete", index, type, id); sb.append(header).append("\n"); } HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON); @@ -316,9 +314,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8))); } - DeleteOperationResponse dor = new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS)); - - return dor; + return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS)); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -329,7 +325,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im long start = System.currentTimeMillis(); Response response = runQuery("_delete_by_query", query, index, type); long end = System.currentTimeMillis(); - Map parsed = parseResponse(response); + + // check for errors in response + parseResponse(response); return new DeleteOperationResponse(end - start); } @@ -359,9 +357,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im */ private int handleSearchCount(Object raw) { if (raw instanceof Number) { - return Integer.valueOf(raw.toString()); + return Integer.parseInt(raw.toString()); } else if (raw instanceof Map) { - return (Integer)((Map)raw).get("value"); + return (Integer)((Map)raw).get("value"); } else { throw new ProcessException("Unknown type for hit count."); } @@ -401,11 +399,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im @Override public String getTransitUrl(String index, String type) { return new StringBuilder() - .append(this.url) - .append(index != null && !index.equals("") ? "/" : "") - .append(index != null ? index : "") - .append(type != null && !type.equals("") ? "/" : "") - .append(type != null ? type : "") - .toString(); + .append(this.url) + .append(StringUtils.isNotBlank(index) ? "/" : "") + .append(StringUtils.isNotBlank(index) ? index : "") + .append(StringUtils.isNotBlank(type) ? "/" : "") + .append(StringUtils.isNotBlank(type) ? type : "") + .toString(); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index 291d99b4df..0374eb7dca 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -36,11 +36,9 @@ import okhttp3.RequestBody; import okhttp3.Response; import okhttp3.Route; import org.apache.commons.text.StringEscapeUtils; -import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; @@ -57,13 +55,7 @@ import org.apache.nifi.util.StringUtils; * A base class for Elasticsearch processors that use the HTTP API */ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElasticsearchProcessor { - enum ElasticsearchVersion { - ES_7, - ES_LESS_THAN_7 - } - - static final String FIELD_INCLUDE_QUERY_PARAM = "_source_include"; - static final String FIELD_INCLUDE_QUERY_PARAM_ES7 = "_source_includes"; + static final String SOURCE_QUERY_PARAM = "_source"; static final String QUERY_QUERY_PARAM = "q"; static final String SORT_QUERY_PARAM = "sort"; static final String SIZE_QUERY_PARAM = "size"; @@ -134,18 +126,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); - public static final PropertyDescriptor ES_VERSION = new PropertyDescriptor.Builder() - .name("elasticsearch-http-version") - .displayName("Elasticsearch Version") - .description("The major version of elasticsearch (this affects some HTTP query parameters and the way responses are parsed).") - .required(true) - .allowableValues( - new AllowableValue(ElasticsearchVersion.ES_LESS_THAN_7.name(), "< 7.0", "Any version of Elasticsearch less than 7.0"), - new AllowableValue(ElasticsearchVersion.ES_7.name(), "7.x", "Elasticsearch version 7.x")) - .defaultValue(ElasticsearchVersion.ES_LESS_THAN_7.name()) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); @Override @@ -167,7 +147,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic static { final List properties = new ArrayList<>(); properties.add(ES_URL); - properties.add(ES_VERSION); properties.add(PROP_SSL_CONTEXT_SERVICE); properties.add(CHARSET); properties.add(USERNAME); @@ -308,12 +287,12 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic sb.append("\": { \"_index\": \""); sb.append(StringEscapeUtils.escapeJson(index)); sb.append("\""); - if (!(StringUtils.isEmpty(docType) | docType == null)){ + if (StringUtils.isNotBlank(docType)) { sb.append(", \"_type\": \""); sb.append(StringEscapeUtils.escapeJson(docType)); sb.append("\""); } - if (!StringUtils.isEmpty(id)) { + if (StringUtils.isNotBlank(id)) { sb.append(", \"_id\": \""); sb.append(StringEscapeUtils.escapeJson(id)); sb.append("\""); @@ -325,7 +304,7 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic sb.append("{\"update\": { \"_index\": \""); sb.append(StringEscapeUtils.escapeJson(index)); sb.append("\""); - if (!(StringUtils.isEmpty(docType) | docType == null)){ + if (StringUtils.isNotBlank(docType)) { sb.append(", \"_type\": \""); sb.append(StringEscapeUtils.escapeJson(docType)); sb.append("\""); @@ -342,47 +321,14 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic sb.append("{\"delete\": { \"_index\": \""); sb.append(StringEscapeUtils.escapeJson(index)); sb.append("\""); - if (!(StringUtils.isEmpty(docType) | docType == null)){ + if (StringUtils.isNotBlank(docType)) { sb.append(", \"_type\": \""); sb.append(StringEscapeUtils.escapeJson(docType)); sb.append("\""); } sb.append(", \"_id\": \""); sb.append(StringEscapeUtils.escapeJson(id)); - sb.append("\" }}\n"); - } - } - - protected String getFieldIncludeParameter(ElasticsearchVersion esVersion) { - return esVersion.equals(ElasticsearchVersion.ES_LESS_THAN_7) - ? FIELD_INCLUDE_QUERY_PARAM : FIELD_INCLUDE_QUERY_PARAM_ES7; - } - - static class ElasticsearchTypeValidator implements Validator { - private final boolean pre7TypeRequired; - - /** - * Creates a validator for an ES type - * @param pre7TypeRequired If true, 'type' will be required for ES - * before version 7.0. - */ - public ElasticsearchTypeValidator(boolean pre7TypeRequired) { - this.pre7TypeRequired = pre7TypeRequired; - } - - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context - .getProperty(ES_VERSION).getValue()); - if (esVersion == ElasticsearchVersion.ES_7) { - return new ValidationResult.Builder().valid(org.apache.commons.lang3.StringUtils.isBlank(input) || "_doc".equals(input)) - .explanation("Elasticsearch no longer supports 'type' as of version 7.0. Please use '_doc' or leave blank.") - .build(); - } else { - return new ValidationResult.Builder().valid(!pre7TypeRequired || org.apache.commons.lang3.StringUtils.isNotBlank(input)) - .explanation("Elasticsearch prior to version 7.0 requires a 'type' to be set.") - .build(); - } + sb.append("\" } }\n"); } } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java index 50cad68594..dea039c9df 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearchHttp.java @@ -40,7 +40,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -121,11 +120,12 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() .name("fetch-es-type") .displayName("Type") - .description("The type of this document (if empty, the first document matching the identifier across all types will be retrieved). " - + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.") - .required(true) + .description("The type of document/fetch (if unset, the first document matching the " + + "identifier across _all types will be retrieved). " + + "This should be unset, '_doc' or '_source' for Elasticsearch 7.0+.") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(new ElasticsearchTypeValidator(false)) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() @@ -150,7 +150,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { relationships = Collections.unmodifiableSet(_rels); final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); - descriptors.add(ES_VERSION); descriptors.add(DOC_ID); descriptors.add(INDEX); descriptors.add(TYPE); @@ -201,8 +200,6 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue() : null; - final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION) - .getValue()); // Authentication final String username = context.getProperty(USERNAME).evaluateAttributeExpressions(flowFile).getValue(); @@ -218,7 +215,7 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { // read the url property from the context final String urlstr = StringUtils.trimToEmpty(context.getProperty(ES_URL).evaluateAttributeExpressions().getValue()); - final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context, esVersion); + final URL url = buildRequestURL(urlstr, docId, index, docType, fields, context); final long startNanos = System.nanoTime(); getResponse = sendRequestToElasticsearch(okHttpClient, url, username, password, "GET", null); @@ -310,18 +307,17 @@ public class FetchElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } } - private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException { + private URL buildRequestURL(String baseUrl, String docId, String index, String type, String fields, ProcessContext context) throws MalformedURLException { if (StringUtils.isEmpty(baseUrl)) { throw new MalformedURLException("Base URL cannot be null"); } HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder(); builder.addPathSegment(index); - builder.addPathSegment((StringUtils.isEmpty(type)) ? "_all" : type); + builder.addPathSegment(StringUtils.isBlank(type) ? "_all" : type); builder.addPathSegment(docId); if (!StringUtils.isEmpty(fields)) { String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(",")); - final String fieldIncludeParameter = getFieldIncludeParameter(esVersion); - builder.addQueryParameter(fieldIncludeParameter, trimmedFields); + builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields); } // Find the user-added properties and set them as query parameters on the URL diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index b427b8ed4d..8c501c3d2e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -111,11 +111,11 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() .name("put-es-type") .displayName("Type") - .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). " - + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.") - .required(true) + .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). " + + "This must be unset or '_doc' for Elasticsearch 7.0+.") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(new ElasticsearchTypeValidator(true)) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() @@ -153,7 +153,6 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { relationships = Collections.unmodifiableSet(_rels); final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); - descriptors.add(ES_VERSION); descriptors.add(ID_ATTRIBUTE); descriptors.add(INDEX); descriptors.add(TYPE); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java index 130bde86e5..fd28b4907c 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecord.java @@ -173,11 +173,11 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() .name("put-es-record-type") .displayName("Type") - .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). " - + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.") - .required(true) + .description("The type of this document (required by Elasticsearch versions < 7.0 for indexing and searching). " + + "This must be unset or '_doc' for Elasticsearch 7.0+.") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(new ElasticsearchTypeValidator(true)) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() @@ -261,7 +261,6 @@ public class PutElasticsearchHttpRecord extends AbstractElasticsearchHttpProcess relationships = Collections.unmodifiableSet(_rels); final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); - descriptors.add(ES_VERSION); descriptors.add(RECORD_READER); descriptors.add(RECORD_WRITER); descriptors.add(LOG_ALL_ERRORS); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index 58bab07d9d..0a769d65e5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -145,7 +145,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() .name("query-es-index") .displayName("Index") - .description("The name of the index to read from. If the property is set " + .description("The name of the index to read from. If the property is unset or set " + "to _all, the query will match across all indexes.") .required(true) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) @@ -155,11 +155,11 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() .name("query-es-type") .displayName("Type") - .description("The type of this document (if empty, searches across all types). " - + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.") - .required(true) + .description("The type of document (if unset, the query will be against all types in the _index). " + + "This should be unset or '_doc' for Elasticsearch 7.0+.") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(new ElasticsearchTypeValidator(false)) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() @@ -235,7 +235,6 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { static { final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); descriptors.add(QUERY); - descriptors.add(ES_VERSION); descriptors.add(PAGE_SIZE); descriptors.add(INDEX); descriptors.add(TYPE); @@ -307,17 +306,16 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile) .getValue(); final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile) - .asInteger().intValue(); + .asInteger(); final Integer limit = context.getProperty(LIMIT).isSet() ? context.getProperty(LIMIT) - .evaluateAttributeExpressions(flowFile).asInteger().intValue() : null; + .evaluateAttributeExpressions(flowFile).asInteger() : null; final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS) .evaluateAttributeExpressions(flowFile).getValue() : null; final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT) .evaluateAttributeExpressions(flowFile).getValue() : null; final boolean targetIsContent = context.getProperty(TARGET).getValue() .equals(TARGET_FLOW_FILE_CONTENT); - final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION) - .getValue()); + // Authentication final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue(); @@ -345,7 +343,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, - mPageSize, fromIndex, context, esVersion); + mPageSize, fromIndex, context); final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, username, password, "GET", null); @@ -506,13 +504,13 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields, - String sort, int pageSize, int fromIndex, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException { + String sort, int pageSize, int fromIndex, ProcessContext context) throws MalformedURLException { if (StringUtils.isEmpty(baseUrl)) { throw new MalformedURLException("Base URL cannot be null"); } HttpUrl.Builder builder = HttpUrl.parse(baseUrl).newBuilder(); builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index); - if (!StringUtils.isEmpty(type)) { + if (StringUtils.isNotBlank(type)) { builder.addPathSegment(type); } builder.addPathSegment("_search"); @@ -521,8 +519,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { builder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(fromIndex)); if (!StringUtils.isEmpty(fields)) { String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(",")); - final String fieldIncludeParameter = getFieldIncludeParameter(esVersion); - builder.addQueryParameter(fieldIncludeParameter, trimmedFields); + builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields); } if (!StringUtils.isEmpty(sort)) { String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(",")); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java index 322f82f1fc..33c75d74e2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ScrollElasticsearchHttp.java @@ -46,8 +46,6 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchTypeValidator; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -137,11 +135,11 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() .name("scroll-es-type") .displayName("Type") - .description("The type of this document (if empty, searches across all types). " - + "This must be empty (check 'Set empty string') or '_doc' for Elasticsearch 7.0+.") - .required(true) + .description("The type of document (if unset, the query will be against all types in the _index). " + + "This should be unset or '_doc' for Elasticsearch 7.0+.") + .required(false) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(new ElasticsearchTypeValidator(false)) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder() @@ -186,7 +184,6 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor relationships = Collections.unmodifiableSet(_rels); final List descriptors = new ArrayList<>(COMMON_PROPERTY_DESCRIPTORS); - descriptors.add(ES_VERSION); descriptors.add(QUERY); descriptors.add(SCROLL_DURATION); descriptors.add(PAGE_SIZE); @@ -239,15 +236,13 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor final String docType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile) .getValue(); final int pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile) - .asInteger().intValue(); + .asInteger(); final String fields = context.getProperty(FIELDS).isSet() ? context.getProperty(FIELDS) .evaluateAttributeExpressions(flowFile).getValue() : null; final String sort = context.getProperty(SORT).isSet() ? context.getProperty(SORT) .evaluateAttributeExpressions(flowFile).getValue() : null; final String scroll = context.getProperty(SCROLL_DURATION).isSet() ? context .getProperty(SCROLL_DURATION).evaluateAttributeExpressions(flowFile).getValue() : null; - final ElasticsearchVersion esVersion = ElasticsearchVersion.valueOf(context.getProperty(ES_VERSION) - .getValue()); // Authentication final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue(); @@ -264,7 +259,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor .getValue()); if (scrollId != null) { final URL scrollurl = buildRequestURL(urlstr, query, index, docType, fields, sort, - scrollId, pageSize, scroll, context, esVersion); + scrollId, pageSize, scroll, context); final long startNanos = System.nanoTime(); final String scrollBody = String.format("{ \"scroll\": \"%s\", \"scroll_id\": \"%s\" }", scroll, @@ -282,7 +277,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor // read the url property from the context final URL queryUrl = buildRequestURL(urlstr, query, index, docType, fields, sort, - scrollId, pageSize, scroll, context, esVersion); + scrollId, pageSize, scroll, context); final long startNanos = System.nanoTime(); final Response getResponse = sendRequestToElasticsearch(okHttpClient, queryUrl, @@ -419,7 +414,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor } private URL buildRequestURL(String baseUrl, String query, String index, String type, String fields, - String sort, String scrollId, int pageSize, String scroll, ProcessContext context, ElasticsearchVersion esVersion) throws MalformedURLException { + String sort, String scrollId, int pageSize, String scroll, ProcessContext context) throws MalformedURLException { if (StringUtils.isEmpty(baseUrl)) { throw new MalformedURLException("Base URL cannot be null"); } @@ -429,7 +424,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor builder.addPathSegment("scroll"); } else { builder.addPathSegment((StringUtils.isEmpty(index)) ? "_all" : index); - if (!StringUtils.isEmpty(type)) { + if (StringUtils.isNotBlank(type)) { builder.addPathSegment(type); } builder.addPathSegment("_search"); @@ -437,8 +432,7 @@ public class ScrollElasticsearchHttp extends AbstractElasticsearchHttpProcessor builder.addQueryParameter(SIZE_QUERY_PARAM, String.valueOf(pageSize)); if (!StringUtils.isEmpty(fields)) { String trimmedFields = Stream.of(fields.split(",")).map(String::trim).collect(Collectors.joining(",")); - final String fieldIncludeParameter = getFieldIncludeParameter(esVersion); - builder.addQueryParameter(fieldIncludeParameter, trimmedFields); + builder.addQueryParameter(SOURCE_QUERY_PARAM, trimmedFields); } if (!StringUtils.isEmpty(sort)) { String trimmedFields = Stream.of(sort.split(",")).map(String::trim).collect(Collectors.joining(",")); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java index e3ace2439f..f60e2f93c2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITQueryElasticsearchHttp.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -43,7 +42,6 @@ public class ITQueryElasticsearchHttp { runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://localhost.internal:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting"); runner.assertNotValid(); @@ -70,7 +68,6 @@ public class ITQueryElasticsearchHttp { runner = TestRunners.newTestRunner(QueryElasticsearchHttp.class); // all docs are found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://localhost.internal:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "prod-accounting"); runner.assertNotValid(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java index b25e3e2722..57ee242ec7 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITScrollElasticsearchHttp.java @@ -20,7 +20,6 @@ import static org.junit.Assert.assertNotNull; import java.io.IOException; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -41,7 +40,6 @@ public class ITScrollElasticsearchHttp { runner = TestRunners.newTestRunner(ScrollElasticsearchHttp.class); // all docs are found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://ip-172-31-49-152.ec2.internal:9200"); - runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(ScrollElasticsearchHttp.INDEX, "prod-accounting"); runner.assertNotValid(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java index c13bf406c5..9a4a63fdf5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttpRecordIT.java @@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -63,7 +62,6 @@ public class PutElasticsearchHttpRecordIT { FETCH_RUNNER.setProperty(FetchElasticsearchHttp.INDEX, "people_test"); FETCH_RUNNER.setProperty(FetchElasticsearchHttp.TYPE, "person"); FETCH_RUNNER.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); - FETCH_RUNNER.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); FETCH_RUNNER.assertValid(); } @@ -77,7 +75,6 @@ public class PutElasticsearchHttpRecordIT { runner = TestRunners.newTestRunner(PutElasticsearchHttpRecord.class); runner.addControllerService("reader", recordReader); runner.enableControllerService(recordReader); - runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(PutElasticsearchHttpRecord.RECORD_READER, "reader"); runner.setProperty(PutElasticsearchHttpRecord.ES_URL, "http://localhost:9200"); runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test"); @@ -213,7 +210,6 @@ public class PutElasticsearchHttpRecordIT { // Undo some stuff from setup() runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people\"test"); runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person"); - runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); recordReader.addRecord(1, new MapRecord(personSchema, new HashMap() {{ put("name", "John Doe"); put("age", 48); @@ -237,7 +233,6 @@ public class PutElasticsearchHttpRecordIT { // Undo some stuff from setup() runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people}test"); runner.setProperty(PutElasticsearchHttpRecord.TYPE, "person"); - runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); recordReader.addRecord(1, new MapRecord(personSchema, new HashMap() {{ put("name", "John Doe"); put("age", 48); @@ -261,7 +256,6 @@ public class PutElasticsearchHttpRecordIT { // Undo some stuff from setup() runner.setProperty(PutElasticsearchHttpRecord.INDEX, "people_test2"); runner.setProperty(PutElasticsearchHttpRecord.TYPE, "per\"son"); - runner.setProperty(PutElasticsearchHttpRecord.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); recordReader.addRecord(1, new MapRecord(personSchema, new HashMap() {{ put("name", "John Doe"); put("age", 48); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java index fb8b164b3a..babdb3b18e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestFetchElasticsearchHttp.java @@ -26,11 +26,12 @@ import static org.mockito.Mockito.when; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.HashMap; +import org.apache.commons.io.IOUtils; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -75,17 +76,16 @@ public class TestFetchElasticsearchHttp { runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); runner.assertNotValid(); runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); - runner.setProperty(FetchElasticsearchHttp.TYPE, ""); + runner.removeProperty(FetchElasticsearchHttp.TYPE); runner.assertValid(); - runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(FetchElasticsearchHttp.TYPE, ""); - runner.assertValid(); // Valid because type is not required prior to 7.0 + runner.assertNotValid(); runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); runner.assertValid(); - runner.setProperty(FetchElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name()); - runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+ + runner.setProperty(FetchElasticsearchHttp.TYPE, "${type}"); + runner.assertValid(); runner.setProperty(FetchElasticsearchHttp.TYPE, "_doc"); - runner.assertValid(); // Valid because type is not required prior to 7.0 + runner.assertValid(); // Valid because type can be _doc for 7.0+ runner.setProperty(AbstractElasticsearchHttpProcessor.CONNECT_TIMEOUT, "${connect.timeout}"); runner.assertValid(); @@ -135,7 +135,7 @@ public class TestFetchElasticsearchHttp { runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, ES_URL); runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); - runner.setProperty(FetchElasticsearchHttp.TYPE, ""); + runner.removeProperty(FetchElasticsearchHttp.TYPE); runner.assertNotValid(); runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); runner.assertValid(); @@ -184,7 +184,7 @@ public class TestFetchElasticsearchHttp { runner = TestRunners.newTestRunner(new FetchElasticsearchHttpTestProcessor(false)); // simulate doc not found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); - runner.setProperty(FetchElasticsearchHttp.TYPE, ""); + runner.removeProperty(FetchElasticsearchHttp.TYPE); runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); runner.setIncomingConnection(true); @@ -284,7 +284,7 @@ public class TestFetchElasticsearchHttp { runner.setProperty(FetchElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); - runner.setProperty(FetchElasticsearchHttp.TYPE, ""); + runner.removeProperty(FetchElasticsearchHttp.TYPE); runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); // Allow time for the controller service to fully initialize @@ -300,7 +300,7 @@ public class TestFetchElasticsearchHttp { @Test public void testFetchElasticsearchOnTriggerQueryParameter() throws IOException { FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found - p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source_include=id&myparam=myvalue"); + p.setExpectedUrl("http://127.0.0.1:9200/doc/status/28039652140?_source=id&myparam=myvalue"); runner = TestRunners.newTestRunner(p); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); @@ -323,6 +323,32 @@ public class TestFetchElasticsearchHttp { out.assertAttributeEquals("doc_id", "28039652140"); } + @Test + public void testFetchElasticsearchOnTriggerQueryParameterNoType() throws IOException { + FetchElasticsearchHttpTestProcessor p = new FetchElasticsearchHttpTestProcessor(true); // all docs are found + p.setExpectedUrl("http://127.0.0.1:9200/doc/_all/28039652140?_source=id&myparam=myvalue"); + runner = TestRunners.newTestRunner(p); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); + runner.removeProperty(FetchElasticsearchHttp.TYPE); + runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); + runner.setProperty(FetchElasticsearchHttp.FIELDS, "id"); + + // Set dynamic property, to be added to the URL as a query parameter + runner.setProperty("myparam", "myvalue"); + + runner.enqueue(docExample, new HashMap() {{ + put("doc_id", "28039652140"); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(FetchElasticsearchHttp.REL_SUCCESS).get(0); + assertNotNull(out); + out.assertAttributeEquals("doc_id", "28039652140"); + } + /** * A Test class that extends the processor in order to inject/mock behavior */ @@ -412,15 +438,25 @@ public class TestFetchElasticsearchHttp { */ @Test @Ignore("Comment this out if you want to run against local or test ES") - public void testFetchElasticsearchBasic() { + public void testFetchElasticsearchBasic() throws IOException { System.out.println("Starting test " + new Object() { }.getClass().getEnclosingMethod().getName()); final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); + // add data to ES instance + new OkHttpClient.Builder().build().newCall( + new Request.Builder().url("http://127.0.0.1:9200/doc/_doc/28039652140") + .addHeader("Content-Type", "application/json") + .put( + RequestBody.create(MediaType.get("application/json"), + IOUtils.toString(docExample, StandardCharsets.UTF_8)) + ).build() + ).execute(); + //Local Cluster - Mac pulled from brew runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); - runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); + runner.removeProperty(FetchElasticsearchHttp.TYPE); runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); runner.assertValid(); @@ -433,31 +469,6 @@ public class TestFetchElasticsearchHttp { runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 1); } - @Test - @Ignore("Comment this out if you want to run against local or test ES") - public void testFetchElasticsearchBatch() throws IOException { - System.out.println("Starting test " + new Object() { - }.getClass().getEnclosingMethod().getName()); - final TestRunner runner = TestRunners.newTestRunner(new FetchElasticsearchHttp()); - - //Local Cluster - Mac pulled from brew - runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(FetchElasticsearchHttp.INDEX, "doc"); - runner.setProperty(FetchElasticsearchHttp.TYPE, "status"); - runner.setProperty(FetchElasticsearchHttp.DOC_ID, "${doc_id}"); - runner.assertValid(); - - for (int i = 0; i < 100; i++) { - long newId = 28039652140L + i; - final String newStrId = Long.toString(newId); - runner.enqueue(docExample, new HashMap() {{ - put("doc_id", newStrId); - }}); - } - runner.run(100); - runner.assertAllFlowFilesTransferred(FetchElasticsearchHttp.REL_SUCCESS, 100); - } - @Test @Ignore("Un-authenticated proxy : Comment this out if you want to run against local proxied ES.") public void testFetchElasticsearchBasicBehindProxy() { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java index 896b9810e8..a9371d13ba 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttp.java @@ -30,7 +30,6 @@ import java.util.HashMap; import org.apache.commons.io.IOUtils; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -112,10 +111,9 @@ public class TestPutElasticsearchHttp { public void testPutElasticSearchOnTriggerIndex_withoutType() throws IOException { runner = TestRunners.newTestRunner(new PutElasticsearchTestProcessor(false)); // no failures runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name()); runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); - runner.setProperty(PutElasticsearchHttp.TYPE, ""); + runner.removeProperty(PutElasticsearchHttp.TYPE); runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); runner.setProperty(PutElasticsearchHttp.ID_ATTRIBUTE, "doc_id"); @@ -340,17 +338,16 @@ public class TestPutElasticsearchHttp { runner.assertNotValid(); runner.setProperty(PutElasticsearchHttp.INDEX, "doc"); - runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(PutElasticsearchHttp.TYPE, ""); - runner.assertNotValid(); // Not valid because type is required prior to 7.0 + runner.assertNotValid(); runner.setProperty(PutElasticsearchHttp.TYPE, " "); - runner.assertNotValid(); // Not valid because type is required prior to 7.0 + runner.assertValid(); runner.removeProperty(PutElasticsearchHttp.TYPE); - runner.assertNotValid(); // Not valid because type is required prior to 7.0 + runner.assertValid(); runner.setProperty(PutElasticsearchHttp.TYPE, "status"); runner.assertValid(); - runner.setProperty(PutElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name()); - runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+ + runner.setProperty(PutElasticsearchHttp.TYPE, "${type}"); + runner.assertValid(); runner.setProperty(PutElasticsearchHttp.TYPE, "_doc"); runner.assertValid(); runner.setProperty(PutElasticsearchHttp.BATCH_SIZE, "1"); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java index a651aefdf8..bea99954ba 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestPutElasticsearchHttpRecord.java @@ -28,7 +28,6 @@ import okio.Buffer; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; @@ -197,10 +196,9 @@ public class TestPutElasticsearchHttpRecord { runner = TestRunners.newTestRunner(processor); // no failures generateTestData(); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name()); runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); - runner.setProperty(PutElasticsearchHttpRecord.TYPE, ""); + runner.removeProperty(PutElasticsearchHttpRecord.TYPE); runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); runner.setProperty(PutElasticsearchHttpRecord.DATE_FORMAT, "d/M/yyyy"); runner.setProperty(PutElasticsearchHttpRecord.TIME_FORMAT, "h:m a"); @@ -248,10 +246,9 @@ public class TestPutElasticsearchHttpRecord { runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures generateTestData(); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name()); runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); - runner.setProperty(PutElasticsearchHttpRecord.TYPE, ""); + runner.removeProperty(PutElasticsearchHttpRecord.TYPE); runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "Update"); runner.enqueue(new byte[0], new HashMap() {{ @@ -291,10 +288,9 @@ public class TestPutElasticsearchHttpRecord { runner = TestRunners.newTestRunner(new PutElasticsearchHttpRecordTestProcessor(false)); // no failures generateTestData(); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(AbstractElasticsearchHttpProcessor.ES_VERSION, ElasticsearchVersion.ES_7.name()); runner.setProperty(PutElasticsearchHttpRecord.INDEX, "doc"); - runner.setProperty(PutElasticsearchHttpRecord.TYPE, ""); + runner.removeProperty(PutElasticsearchHttpRecord.TYPE); runner.setProperty(PutElasticsearchHttpRecord.ID_RECORD_PATH, "/id"); runner.setProperty(PutElasticsearchHttpRecord.INDEX_OP, "DELETE"); runner.enqueue(new byte[0], new HashMap() {{ diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java index 0f6b6e44fd..4cdb035eca 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttp.java @@ -32,7 +32,6 @@ import java.util.List; import org.apache.commons.io.IOUtils; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -65,7 +64,6 @@ public class TestQueryElasticsearchHttp { public void testQueryElasticsearchOnTrigger_withInput() throws IOException { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.assertNotValid(); @@ -85,7 +83,6 @@ public class TestQueryElasticsearchHttp { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); runner.setValidateExpressionUsage(true); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.assertNotValid(); @@ -109,20 +106,15 @@ public class TestQueryElasticsearchHttp { runner.assertNotValid(); runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter AND identifier:\"${identifier}\""); - runner.setProperty(QueryElasticsearchHttp.TYPE, ""); + runner.removeProperty(QueryElasticsearchHttp.TYPE); runner.assertValid(); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); - runner.setProperty(QueryElasticsearchHttp.TYPE, ""); - runner.assertValid(); // Valid because type is not required prior to 7.0 runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); runner.assertValid(); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name()); - runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+ - runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc"); - runner.assertValid(); - runner.removeProperty(QueryElasticsearchHttp.TYPE); - runner.assertNotValid(); runner.setProperty(QueryElasticsearchHttp.TYPE, ""); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "${type}"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "_doc"); runner.assertValid(); runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); runner.assertValid(); @@ -138,7 +130,6 @@ public class TestQueryElasticsearchHttp { public void testQueryElasticsearchOnTrigger_withInput_attributeTarget() throws IOException { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.assertNotValid(); @@ -165,7 +156,6 @@ public class TestQueryElasticsearchHttp { public void testQueryElasticsearchOnTrigger_withNoInput() throws IOException { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.assertNotValid(); @@ -211,7 +201,6 @@ public class TestQueryElasticsearchHttp { public void testQueryElasticsearchOnTriggerWithFields() throws IOException { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.assertNotValid(); @@ -231,7 +220,6 @@ public class TestQueryElasticsearchHttp { public void testQueryElasticsearchOnTriggerWithLimit() throws IOException { runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor()); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.assertNotValid(); @@ -254,7 +242,6 @@ public class TestQueryElasticsearchHttp { processor.setStatus(500, "Server error"); runner = TestRunners.newTestRunner(processor); // simulate doc not found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); @@ -280,7 +267,6 @@ public class TestQueryElasticsearchHttp { processor.setStatus(100, "Should fail"); runner = TestRunners.newTestRunner(processor); // simulate doc not found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); @@ -306,7 +292,6 @@ public class TestQueryElasticsearchHttp { processor.setExceptionToThrow(new IOException("Error reading from disk")); runner = TestRunners.newTestRunner(processor); // simulate doc not found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); @@ -332,7 +317,6 @@ public class TestQueryElasticsearchHttp { processor.setStatus(100, "Should fail", 2); runner = TestRunners.newTestRunner(processor); // simulate doc not found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); @@ -359,7 +343,6 @@ public class TestQueryElasticsearchHttp { processor.setStatus(100, "Should fail", 1); runner = TestRunners.newTestRunner(processor); // simulate doc not found runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); @@ -382,9 +365,8 @@ public class TestQueryElasticsearchHttp { runner.enableControllerService(sslService); runner.setProperty(QueryElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); - runner.setProperty(QueryElasticsearchHttp.TYPE, ""); + runner.removeProperty(QueryElasticsearchHttp.TYPE); runner.setProperty(QueryElasticsearchHttp.QUERY, "${doc_id}"); // Allow time for the controller service to fully initialize @@ -421,7 +403,6 @@ public class TestQueryElasticsearchHttp { runner.setProperty(QueryElasticsearchHttp.PROXY_HOST, "localhost"); runner.setProperty(QueryElasticsearchHttp.PROXY_PORT, "3228"); runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.enqueue("".getBytes(), new HashMap() {{ put("doc_id", "28039652140"); @@ -449,7 +430,6 @@ public class TestQueryElasticsearchHttp { runner.setProperty(QueryElasticsearchHttp.PROXY_USERNAME, "squid"); runner.setProperty(QueryElasticsearchHttp.PROXY_PASSWORD, "changeme"); runner.setProperty(QueryElasticsearchHttp.ES_URL, "http://172.18.0.2:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.enqueue("".getBytes(), new HashMap() {{ put("doc_id", "28039652140"); @@ -465,7 +445,6 @@ public class TestQueryElasticsearchHttp { p.setExpectedParam("myparam=myvalue"); runner = TestRunners.newTestRunner(p); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); @@ -478,30 +457,15 @@ public class TestQueryElasticsearchHttp { @Test public void testQueryElasticsearchOnTrigger_sourceIncludes() throws IOException { QueryElasticsearchHttpTestProcessor p = new QueryElasticsearchHttpTestProcessor(); - p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param + p.setExpectedParam("_source=test"); runner = TestRunners.newTestRunner(p); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter"); runner.setProperty(QueryElasticsearchHttp.FIELDS, "test"); runAndVerifySuccess(true); - - // Now test with ES 7.x - - p = new QueryElasticsearchHttpTestProcessor(); - p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param - runner = TestRunners.newTestRunner(p); - runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(QueryElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name()); - - runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); - runner.setProperty(QueryElasticsearchHttp.TYPE, ""); - runner.setProperty(QueryElasticsearchHttp.QUERY, "source:Twitter"); - runner.setProperty(QueryElasticsearchHttp.FIELDS, "test"); - runAndVerifySuccess(true); } /** diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java index 74ac031b72..f790256de8 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestScrollElasticsearchHttp.java @@ -32,7 +32,6 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.elasticsearch.AbstractElasticsearchHttpProcessor.ElasticsearchVersion; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -83,30 +82,15 @@ public class TestScrollElasticsearchHttp { @Test public void testScrollElasticsearchOnTrigger_sourceIncludes() throws IOException { ScrollElasticsearchHttpTestProcessor p = new ScrollElasticsearchHttpTestProcessor(); - p.setExpectedParam("_source_include=test"); // < ES 7.0 expects this param + p.setExpectedParam("_source=test"); runner = TestRunners.newTestRunner(p); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter"); runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test"); runAndVerifySuccess(); - - // Now test with ES 7.x - - p = new ScrollElasticsearchHttpTestProcessor(); - p.setExpectedParam("_source_includes=test"); // >= ES 7.0 expects this param - runner = TestRunners.newTestRunner(p); - runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name()); - - runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); - runner.setProperty(ScrollElasticsearchHttp.TYPE, ""); - runner.setProperty(ScrollElasticsearchHttp.QUERY, "source:Twitter"); - runner.setProperty(ScrollElasticsearchHttp.FIELDS, "test"); - runAndVerifySuccess(); } @Test @@ -163,20 +147,18 @@ public class TestScrollElasticsearchHttp { runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); runner.assertNotValid(); - runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_LESS_THAN_7.name()); runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); - runner.assertNotValid(); - runner.setProperty(ScrollElasticsearchHttp.TYPE, ""); - runner.assertValid(); // Valid because type is not required prior to 7.0 - runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); - runner.assertValid(); - runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name()); - runner.assertNotValid(); // Not valid because type must be _doc or empty for 7.0+ - runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc"); runner.assertValid(); runner.removeProperty(ScrollElasticsearchHttp.TYPE); - runner.assertNotValid(); + runner.assertValid(); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "status"); + runner.assertValid(); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "${type}"); + runner.assertValid(); runner.setProperty(ScrollElasticsearchHttp.TYPE, ""); + runner.assertNotValid(); + runner.setProperty(ScrollElasticsearchHttp.TYPE, "_doc"); + runner.assertValid(); runner.setProperty(ScrollElasticsearchHttp.FIELDS, "id,, userinfo.location"); runner.assertValid(); runner.setProperty(ScrollElasticsearchHttp.SORT, "timestamp:asc,identifier:desc"); @@ -286,9 +268,8 @@ public class TestScrollElasticsearchHttp { runner.enableControllerService(sslService); runner.setProperty(ScrollElasticsearchHttp.PROP_SSL_CONTEXT_SERVICE, "ssl-context"); runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); - runner.setProperty(ScrollElasticsearchHttp.ES_VERSION, ElasticsearchVersion.ES_7.name()); runner.setProperty(ScrollElasticsearchHttp.INDEX, "doc"); - runner.setProperty(ScrollElasticsearchHttp.TYPE, ""); + runner.removeProperty(ScrollElasticsearchHttp.TYPE); runner.setProperty(ScrollElasticsearchHttp.QUERY, "${doc_id}"); runner.setIncomingConnection(false); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy index 214f15e54a..1eb2893134 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy @@ -28,6 +28,7 @@ import org.apache.nifi.schema.access.SchemaAccessUtils import org.apache.nifi.serialization.RecordReaderFactory import org.apache.nifi.serialization.record.MockRecordParser import org.apache.nifi.serialization.record.MockSchemaRegistry +import org.apache.nifi.util.StringUtils import org.apache.nifi.util.TestRunner import org.apache.nifi.util.TestRunners import org.junit.Assert @@ -206,6 +207,42 @@ class PutElasticsearchRecordTest { "schema.name": "recordPathTest" ]) runner.run() + runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1) + runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) + + runner.clearTransferState() + + flowFileContents = prettyPrint(toJson([ + [ msg: "Hello" ], + [ id: null, type: null, msg: "Hello" ], + [ id: "rec-3", msg: "Hello" ], + [ id: "rec-4", msg: "Hello" ], + [ id: "rec-5", msg: "Hello" ], + [ id: "rec-6", type: "message", msg: "Hello" ] + ])) + + evalClosure = { List items -> + def nullTypeCount = items.findAll { it.type == null }.size() + def messageTypeCount = items.findAll { it.type == "message" }.size() + def nullIdCount = items.findAll { it.id == null }.size() + def recIdCount = items.findAll { StringUtils.startsWith(it.id, "rec-") }.size() + Assert.assertEquals("null type", 5, nullTypeCount) + Assert.assertEquals("message type", 1, messageTypeCount) + Assert.assertEquals("null id", 2, nullIdCount) + Assert.assertEquals("rec- id", 4, recIdCount) + } + + clientService.evalClosure = evalClosure + + runner.removeProperty(PutElasticsearchRecord.TYPE) + runner.enqueue(flowFileContents, [ + "schema.name": "recordPathTest" + ]) + runner.run() + runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1) + runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0) + runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0) runner.clearTransferState()