diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java index 13e2aa335e..a9dd13cf5f 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/DeleteOperationResponse.java @@ -17,13 +17,14 @@ package org.apache.nifi.elasticsearch; -public class DeleteOperationResponse { +public class DeleteOperationResponse implements OperationResponse { private final long took; public DeleteOperationResponse(final long took) { this.took = took; } + @Override public long getTook() { return took; } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java index d232f2bca9..d186306516 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientService.java @@ -26,7 +26,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.ssl.SSLContextService; -import java.io.IOException; import java.util.List; import java.util.Map; @@ -83,6 +82,11 @@ public interface ElasticSearchClientService extends ControllerService { .defaultValue("60000") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + /** + * @deprecated this setting is no longer used and will be removed in a future version. + * Property retained for now to prevent existing Flows with this processor from breaking upon upgrade. + */ + @Deprecated PropertyDescriptor RETRY_TIMEOUT = new PropertyDescriptor.Builder() .name("el-cs-retry-timeout") .displayName("Retry timeout") @@ -119,71 +123,85 @@ public interface ElasticSearchClientService extends ControllerService { * Index a document. * * @param operation A document to index. + * @param requestParameters A collection of URL request parameters. Optional. * @return IndexOperationResponse if successful - * @throws IOException thrown when there is an error. */ - IndexOperationResponse add(IndexOperationRequest operation); + IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters); /** * Bulk process multiple documents. * * @param operations A list of index operations. + * @param requestParameters A collection of URL request parameters. Optional. * @return IndexOperationResponse if successful. - * @throws IOException thrown when there is an error. */ - IndexOperationResponse bulk(List operations); + IndexOperationResponse bulk(List operations, Map requestParameters); /** * Count the documents that match the criteria. * * @param query A query in the JSON DSL syntax * @param index The index to target. - * @param type The type to target. - * @return + * @param type The type to target. Will not be used in future versions of Elasticsearch. + * @param requestParameters A collection of URL request parameters. Optional. + * @return number of documents matching the query */ - Long count(String query, String index, String type); + Long count(String query, String index, String type, Map requestParameters); /** * Delete a document by its ID from an index. * * @param index The index to target. - * @param type The type to target. Optional. + * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. * @param id The document ID to remove from the selected index. + * @param requestParameters A collection of URL request parameters. Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteById(String index, String type, String id); + DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters); /** * Delete multiple documents by ID from an index. * @param index The index to target. - * @param type The type to target. Optional. + * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch. * @param ids A list of document IDs to remove from the selected index. + * @param requestParameters A collection of URL request parameters. Optional. * @return A DeleteOperationResponse object if successful. - * @throws IOException thrown when there is an error. */ - DeleteOperationResponse deleteById(String index, String type, List ids); + DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters); /** * Delete documents by query. * * @param query A valid JSON query to be used for finding documents to delete. * @param index The index to target. - * @param type The type to target within the index. Optional. + * @param type The type to target within the index. Optional. Will not be used in future versions of Elasticsearch. + * @param requestParameters A collection of URL request parameters. Optional. * @return A DeleteOperationResponse object if successful. */ - DeleteOperationResponse deleteByQuery(String query, String index, String type); + DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters); + + /** + * Update documents by query. + * + * @param query A valid JSON query to be used for finding documents to update. + * @param index The index to target. + * @param type The type to target within the index. Optional. Will not be used in future versions of Elasticsearch. + * @param requestParameters A collection of URL request parameters. Optional. + * @return An UpdateOperationResponse object if successful. + */ + UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters); /** * Get a document by ID. * * @param index The index that holds the document. - * @param type The document type. Optional. + * @param type The document type. Optional. Will not be used in future versions of Elasticsearch. * @param id The document ID + * @param requestParameters A collection of URL request parameters. Optional. * @return Map if successful, null if not found. - * @throws IOException thrown when there is an error. */ - Map get(String index, String type, String id); + Map get(String index, String type, String id, Map requestParameters); /** * Perform a search using the JSON DSL. diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java index be79416cdf..07cdb3cad1 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationRequest.java @@ -21,8 +21,9 @@ import java.util.Arrays; import java.util.Map; /** - * A POJO that represents an "operation on an index." It should not be confused with just indexing documents, as it + * A POJO that represents an "operation on an index". It should not be confused with just indexing documents, as it * covers all CRUD-related operations that can be executed against an Elasticsearch index with documents. + * Type is optional and will not be used in future versions of Elasticsearch. */ public class IndexOperationRequest { private final String index; diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java index 8d19ca8972..5aa390640a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/IndexOperationResponse.java @@ -23,7 +23,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -public class IndexOperationResponse { +public class IndexOperationResponse implements OperationResponse { private final long took; private boolean hasErrors; private List> items; @@ -34,6 +34,7 @@ public class IndexOperationResponse { this.took = took; } + @Override public long getTook() { return took; } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/OperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/OperationResponse.java new file mode 100644 index 0000000000..c349768035 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/OperationResponse.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +public interface OperationResponse { + /** + * @return the time in millisecond taken to complete the Elasticsearch operation + */ + long getTook(); +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java index e6bfb15771..2a79a7a552 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/SearchResponse.java @@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch; import java.util.List; import java.util.Map; -public class SearchResponse { +public class SearchResponse implements OperationResponse { private final List> hits; private final Map aggregations; private final long numberOfHits; @@ -73,7 +73,8 @@ public class SearchResponse { return timedOut; } - public int getTook() { + @Override + public long getTook() { return took; } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/UpdateOperationResponse.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/UpdateOperationResponse.java new file mode 100644 index 0000000000..53c861aaa0 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service-api/src/main/java/org/apache/nifi/elasticsearch/UpdateOperationResponse.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.elasticsearch; + +public class UpdateOperationResponse implements OperationResponse { + private final long took; + + public UpdateOperationResponse(long took) { + this.took = took; + } + + @Override + public long getTook() { + return took; + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml index d1a02b33c9..031c918f8c 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/pom.xml @@ -30,6 +30,7 @@ 5.6.16 setup-5.script faketype + src/test/resources/conf-5/ testCluster 9500 9400 @@ -41,14 +42,14 @@ org.apache.nifi nifi-lookup-service-api - provided 1.15.0-SNAPSHOT + provided org.apache.nifi nifi-api - provided 1.15.0-SNAPSHOT + provided org.apache.nifi @@ -67,6 +68,35 @@ 1.15.0-SNAPSHOT provided + + org.apache.nifi + nifi-ssl-context-service-api + 1.15.0-SNAPSHOT + compile + + + org.apache.nifi + nifi-elasticsearch-client-service-api + 1.15.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-avro-record-utils + 1.15.0-SNAPSHOT + compile + + + org.apache.nifi + nifi-schema-registry-service-api + compile + + + org.apache.nifi + nifi-record-path + 1.15.0-SNAPSHOT + compile + com.fasterxml.jackson.core @@ -77,7 +107,7 @@ commons-io commons-io - 2.10.0 + 2.11.0 @@ -88,27 +118,44 @@ org.apache.commons commons-lang3 - 3.11 + 3.12.0 org.slf4j log4j-over-slf4j + + com.jayway.jsonpath + json-path + 2.6.0 + + + org.apache.httpcomponents + httpclient + 4.5.13 + + + + org.elasticsearch.client + elasticsearch-rest-client + + 7.10.2 + compile + + + com.vividsolutions + jts + + + - - org.apache.nifi - nifi-mock - 1.15.0-SNAPSHOT - test - - - org.apache.nifi - nifi-ssl-context-service-api - 1.15.0-SNAPSHOT - compile - org.apache.nifi nifi-standard-web-test-utils @@ -123,54 +170,9 @@ org.apache.nifi - nifi-elasticsearch-client-service-api + nifi-mock 1.15.0-SNAPSHOT - provided - - - org.elasticsearch - elasticsearch - 5.6.16 - compile - - - org.elasticsearch.client - elasticsearch-rest-high-level-client - 5.6.16 - compile - - - com.vividsolutions - jts - - - - - org.apache.nifi - nifi-avro-record-utils - 1.15.0-SNAPSHOT - compile - - - org.apache.nifi - nifi-schema-registry-service-api - compile - - - com.jayway.jsonpath - json-path - 2.6.0 - - - org.apache.httpcomponents - httpclient - 4.5.13 - - - org.apache.nifi - nifi-record-path - 1.15.0-SNAPSHOT - compile + test org.apache.nifi @@ -189,31 +191,33 @@ ${nifi.groovy.version} test - - org.elasticsearch.client - elasticsearch-rest-client - 5.6.16 - - - - - - org.apache.maven.plugins - maven-failsafe-plugin - 3.0.0-M3 - - - com.github.alexcojocaru - elasticsearch-maven-plugin - 6.19 - - - - - + + integration-tests + + false + + + + + + org.apache.maven.plugins + maven-failsafe-plugin + + 3.0.0-M3 + + + com.github.alexcojocaru + elasticsearch-maven-plugin + 6.19 + + + + + + elasticsearch-6 @@ -221,9 +225,10 @@ false - 6.8.13 + 6.8.19 _doc setup-6.script + @@ -236,6 +241,7 @@ 7.10.2 setup-7.script + @@ -270,6 +276,7 @@ ${es.int.logLevel} ${project.basedir}/src/test/resources/${es.int.script.name} false + ${es.int.path.conf} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java index eb461022d6..65750508a2 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchClientServiceImpl.java @@ -29,7 +29,6 @@ import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.message.BasicHeader; import org.apache.http.nio.entity.NStringEntity; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -41,6 +40,7 @@ import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StringUtils; +import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; @@ -58,6 +58,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -73,7 +74,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im private Charset responseCharset; static { - List props = new ArrayList<>(); + final List props = new ArrayList<>(); props.add(ElasticSearchClientService.HTTP_HOSTS); props.add(ElasticSearchClientService.USERNAME); props.add(ElasticSearchClientService.PASSWORD); @@ -104,7 +105,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); } - } catch (Exception ex) { + } catch (final Exception ex) { getLogger().error("Could not initialize ElasticSearch client.", ex); throw new InitializationException(ex); } @@ -118,7 +119,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im private void setupClient(final ConfigurationContext context) throws MalformedURLException, InitializationException { final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue(); - String[] hostsSplit = hosts.split(",[\\s]*"); + final String[] hostsSplit = hosts.split(",[\\s]*"); this.url = hostsSplit[0]; final SSLContextService sslService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); @@ -127,7 +128,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger(); final Integer readTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger(); - final Integer retryTimeout = context.getProperty(RETRY_TIMEOUT).asInteger(); final HttpHost[] hh = new HttpHost[hostsSplit.length]; for (int x = 0; x < hh.length; x++) { @@ -139,7 +139,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im try { sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured())) ? sslService.createContext() : null; - } catch (Exception e) { + } catch (final Exception e) { getLogger().error("Error building up SSL Context from the supplied configuration.", e); throw new InitializationException(e); } @@ -163,8 +163,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im requestConfigBuilder.setConnectTimeout(connectTimeout); requestConfigBuilder.setSocketTimeout(readTimeout); return requestConfigBuilder; - }) - .setMaxRetryTimeoutMillis(retryTimeout); + }); this.client = builder.build(); } @@ -182,7 +181,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); try { - return client.performRequest("POST", sb.toString(), requestParameters != null ? requestParameters : Collections.emptyMap(), queryEntity); + return performRequest("POST", sb.toString(), requestParameters, queryEntity); } catch (final Exception e) { throw new ElasticsearchError(e); } @@ -197,7 +196,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final InputStream inputStream = response.getEntity().getContent(); final byte[] result = IOUtils.toByteArray(inputStream); inputStream.close(); - return (Map) mapper.readValue(new String(result, responseCharset), Map.class); + return mapper.readValue(new String(result, responseCharset), Map.class); } else { final String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s", response.getStatusLine().getReasonPhrase()); @@ -217,8 +216,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im } @Override - public IndexOperationResponse add(final IndexOperationRequest operation) { - return bulk(Collections.singletonList(operation)); + public IndexOperationResponse add(final IndexOperationRequest operation, final Map requestParameters) { + return bulk(Collections.singletonList(operation), requestParameters); } private String flatten(final String str) { @@ -277,7 +276,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im } @Override - public IndexOperationResponse bulk(final List operations) { + public IndexOperationResponse bulk(final List operations, final Map requestParameters) { try { final StringBuilder payload = new StringBuilder(); for (final IndexOperationRequest or : operations) { @@ -290,7 +289,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(); watch.start(); - final Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity); + final Response response = performRequest("POST", "_bulk", requestParameters, entity); watch.stop(); final String rawResponse = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); @@ -307,20 +306,20 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im } @Override - public Long count(final String query, final String index, final String type) { - final Response response = runQuery("_count", query, index, type, null); + public Long count(final String query, final String index, final String type, final Map requestParameters) { + final Response response = runQuery("_count", query, index, type, requestParameters); final Map parsed = parseResponse(response); return ((Integer)parsed.get("count")).longValue(); } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final String id) { - return deleteById(index, type, Collections.singletonList(id)); + public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map requestParameters) { + return deleteById(index, type, Collections.singletonList(id), requestParameters); } @Override - public DeleteOperationResponse deleteById(final String index, final String type, final List ids) { + public DeleteOperationResponse deleteById(final String index, final String type, final List ids, final Map requestParameters) { try { final StringBuilder sb = new StringBuilder(); for (final String id : ids) { @@ -330,7 +329,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(); watch.start(); - final Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity); + final Response response = performRequest("POST", "_bulk", requestParameters, entity); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -346,10 +345,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im } @Override - public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type) { + public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map requestParameters) { final StopWatch watch = new StopWatch(); watch.start(); - final Response response = runQuery("_delete_by_query", query, index, type, null); + final Response response = runQuery("_delete_by_query", query, index, type, requestParameters); watch.stop(); // check for errors in response @@ -359,9 +358,21 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS)); } + + public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map requestParameters) { + final long start = System.currentTimeMillis(); + final Response response = runQuery("_update_by_query", query, index, type, requestParameters); + final long end = System.currentTimeMillis(); + + // check for errors in response + parseResponse(response); + + return new UpdateOperationResponse(end - start); + } + @SuppressWarnings("unchecked") @Override - public Map get(final String index, final String type, final String id) { + public Map get(final String index, final String type, final String id, final Map requestParameters) { try { final StringBuilder endpoint = new StringBuilder(); endpoint.append(index); @@ -371,8 +382,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im endpoint.append("/_doc"); } endpoint.append("/").append(id); - final Response response = client.performRequest("GET", endpoint.toString(), new BasicHeader("Content-Type", "application/json")); + final Response response = performRequest("GET", endpoint.toString(), requestParameters, null); final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); parseResponseWarningHeaders(response); @@ -402,7 +413,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im try { final Response response = runQuery("_search", query, index, type, requestParameters); return buildSearchResponse(response); - } catch (Exception ex) { + } catch (final Exception ex) { throw new RuntimeException(ex); } } @@ -411,10 +422,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im public SearchResponse scroll(final String scroll) { try { final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON); - final Response response = client.performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity); - + final Response response = performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity); return buildSearchResponse(response); - } catch (Exception ex) { + } catch (final Exception ex) { throw new RuntimeException(ex); } } @@ -427,7 +437,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im put("keep_alive", keepAlive); } }}; - final Response response = client.performRequest("POST", index + "/_pit", params); + final Response response = performRequest("POST", index + "/_pit", params, null); final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8); parseResponseWarningHeaders(response); @@ -447,7 +457,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final HttpEntity pitEntity = new NStringEntity(String.format("{\"id\": \"%s\"}", pitId), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(true); - final Response response = client.performRequest("DELETE", "/_pit", Collections.emptyMap(), pitEntity); + final Response response = performRequest("DELETE", "/_pit", Collections.emptyMap(), pitEntity); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -476,7 +486,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im final HttpEntity scrollBody = new NStringEntity(String.format("{\"scroll_id\": \"%s\"}", scrollId), ContentType.APPLICATION_JSON); final StopWatch watch = new StopWatch(true); - final Response response = client.performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), scrollBody); + final Response response = performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), scrollBody); watch.stop(); if (getLogger().isDebugEnabled()) { @@ -519,15 +529,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im if (getLogger().isDebugEnabled()) { final String searchSummary = "******************" + - String.format("Took: %d", took) + - String.format("Timed out: %s", timedOut) + - String.format("Aggregation count: %d", aggregations.size()) + - String.format("Hit count: %d", hits.size()) + - String.format("PIT Id: %s", pitId) + - String.format("Scroll Id: %s", scrollId) + - String.format("Search After: %s", searchAfter) + - String.format("Total found: %d", count) + - String.format("Warnings: %s", warnings) + + String.format(Locale.getDefault(), "Took: %d", took) + + String.format(Locale.getDefault(), "Timed out: %s", timedOut) + + String.format(Locale.getDefault(), "Aggregation count: %d", aggregations.size()) + + String.format(Locale.getDefault(), "Hit count: %d", hits.size()) + + String.format(Locale.getDefault(), "PIT Id: %s", pitId) + + String.format(Locale.getDefault(), "Scroll Id: %s", scrollId) + + String.format(Locale.getDefault(), "Search After: %s", searchAfter) + + String.format(Locale.getDefault(), "Total found: %d", count) + + String.format(Locale.getDefault(), "Warnings: %s", warnings) + "******************"; getLogger().debug(searchSummary); } @@ -554,4 +564,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im (StringUtils.isNotBlank(type) ? "/" : "") + (StringUtils.isNotBlank(type) ? type : ""); } + + private Response performRequest(final String method, final String endpoint, final Map parameters, final HttpEntity entity) throws IOException { + final Request request = new Request(method, endpoint); + if (parameters != null && !parameters.isEmpty()) { + request.addParameters(parameters); + } + if (entity != null) { + request.setEntity(entity); + } + return client.performRequest(request); + } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java index 5b476de8d6..abe2c2e607 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchLookupService.java @@ -83,19 +83,18 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi private String type; private ObjectMapper mapper; - private final List DESCRIPTORS; + private volatile ConcurrentHashMap recordPathMappings; + + private final List descriptors; public ElasticSearchLookupService() { - List _desc = new ArrayList<>(); - _desc.addAll(super.getSupportedPropertyDescriptors()); - _desc.add(CLIENT_SERVICE); - _desc.add(INDEX); - _desc.add(TYPE); - DESCRIPTORS = Collections.unmodifiableList(_desc); + final List desc = new ArrayList<>(super.getSupportedPropertyDescriptors()); + desc.add(CLIENT_SERVICE); + desc.add(INDEX); + desc.add(TYPE); + descriptors = Collections.unmodifiableList(desc); } - private volatile ConcurrentHashMap mappings; - @Override @OnEnabled public void onEnabled(final ConfigurationContext context) { @@ -104,30 +103,29 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi type = context.getProperty(TYPE).evaluateAttributeExpressions().getValue(); mapper = new ObjectMapper(); - List dynamic = context.getProperties().entrySet().stream() - .filter( e -> e.getKey().isDynamic()) - .map(e -> e.getKey()) + final List dynamicDescriptors = context.getProperties().keySet().stream() + .filter(PropertyDescriptor::isDynamic) .collect(Collectors.toList()); - Map _temp = new HashMap<>(); - for (PropertyDescriptor desc : dynamic) { - String value = context.getProperty(desc).getValue(); - String name = desc.getName(); - _temp.put(name, RecordPath.compile(value)); + final Map tempRecordPathMappings = new HashMap<>(); + for (final PropertyDescriptor desc : dynamicDescriptors) { + final String value = context.getProperty(desc).getValue(); + final String name = desc.getName(); + tempRecordPathMappings.put(name, RecordPath.compile(value)); } - mappings = new ConcurrentHashMap<>(_temp); + recordPathMappings = new ConcurrentHashMap<>(tempRecordPathMappings); super.onEnabled(context); } @Override protected List getSupportedPropertyDescriptors() { - return DESCRIPTORS; + return descriptors; } @Override - public PropertyDescriptor getSupportedDynamicPropertyDescriptor(String name) { + public PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String name) { return new PropertyDescriptor.Builder() .name(name) .addValidator((subject, input, context) -> { @@ -148,21 +146,21 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi } @Override - public Optional lookup(Map coordinates) throws LookupFailureException { - Map context = coordinates.entrySet().stream() + public Optional lookup(final Map coordinates) throws LookupFailureException { + final Map context = coordinates.entrySet().stream() .collect(Collectors.toMap( - e -> e.getKey(), + Map.Entry::getKey, e -> e.getValue().toString() )); return lookup(coordinates, context); } @Override - public Optional lookup(Map coordinates, Map context) throws LookupFailureException { + public Optional lookup(final Map coordinates, final Map context) throws LookupFailureException { validateCoordinates(coordinates); try { - Record record; + final Record record; if (coordinates.containsKey("_id")) { record = getById((String)coordinates.get("_id"), context); } else { @@ -176,8 +174,8 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi } } - private void validateCoordinates(Map coordinates) throws LookupFailureException { - List reasons = new ArrayList<>(); + private void validateCoordinates(final Map coordinates) throws LookupFailureException { + final List reasons = new ArrayList<>(); if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) { reasons.add("_id was supplied, but it was not a String."); @@ -187,14 +185,15 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi reasons.add("When _id is used, it can be the only key used in the lookup."); } - if (reasons.size() > 0) { - String error = String.join("\n", reasons); + if (!reasons.isEmpty()) { + final String error = String.join("\n", reasons); throw new LookupFailureException(error); } } - private Record getById(final String _id, Map context) throws IOException, LookupFailureException, SchemaNotFoundException { - Map query = new HashMap(){{ + @SuppressWarnings("unchecked") + private Record getById(final String _id, final Map context) throws IOException, LookupFailureException, SchemaNotFoundException { + final Map query = new HashMap(){{ put("query", new HashMap() {{ put("match", new HashMap(){{ put("_id", _id); @@ -202,10 +201,9 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi }}); }}; - String json = mapper.writeValueAsString(query); - - SearchResponse response = clientService.search(json, index, type, null); + final String json = mapper.writeValueAsString(query); + final SearchResponse response = clientService.search(json, index, type, null); if (response.getNumberOfHits() > 1) { throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s", response.getNumberOfHits(), json)); @@ -213,21 +211,20 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi return null; } - final Map source = (Map)response.getHits().get(0).get("_source"); + final Map source = (Map)response.getHits().get(0).get("_source"); - RecordSchema toUse = getSchema(context, source, null); + final RecordSchema toUse = getSchema(context, source, null); Record record = new MapRecord(toUse, source); - - if (mappings.size() > 0) { + if (recordPathMappings.size() > 0) { record = applyMappings(record, source); } return record; } - Map getNested(String key, Object value) { - String path = key.substring(0, key.lastIndexOf(".")); + Map getNested(final String key, final Object value) { + final String path = key.substring(0, key.lastIndexOf(".")); return new HashMap(){{ put("path", path); @@ -239,8 +236,8 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi }}; } - private Map buildQuery(Map coordinates) { - Map query = new HashMap(){{ + private Map buildQuery(final Map coordinates) { + final Map query = new HashMap(){{ put("bool", new HashMap(){{ put("must", coordinates.entrySet().stream() .map(e -> new HashMap(){{ @@ -256,28 +253,25 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi }}); }}; - Map outter = new HashMap(){{ + return new HashMap(){{ put("size", 1); put("query", query); }}; - - return outter; } - private Record getByQuery(final Map query, Map context) throws LookupFailureException { + @SuppressWarnings("unchecked") + private Record getByQuery(final Map query, final Map context) throws LookupFailureException { try { final String json = mapper.writeValueAsString(buildQuery(query)); - SearchResponse response = clientService.search(json, index, type, null); - + final SearchResponse response = clientService.search(json, index, type, null); if (response.getNumberOfHits() == 0) { return null; } else { - final Map source = (Map)response.getHits().get(0).get("_source"); - RecordSchema toUse = getSchema(context, source, null); + final Map source = (Map)response.getHits().get(0).get("_source"); + final RecordSchema toUse = getSchema(context, source, null); Record record = new MapRecord(toUse, source); - - if (mappings.size() > 0) { + if (recordPathMappings.size() > 0) { record = applyMappings(record, source); } @@ -289,23 +283,20 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi } } - private Record applyMappings(Record record, Map source) { - Record _rec = new MapRecord(record.getSchema(), new HashMap<>()); + private Record applyMappings(final Record record, final Map source) { + final Record rec = new MapRecord(record.getSchema(), new HashMap<>()); - mappings.entrySet().forEach(entry -> { + recordPathMappings.forEach((key, path) -> { try { - Object o = JsonPath.read(source, entry.getKey()); - RecordPath path = entry.getValue(); - Optional first = path.evaluate(_rec).getSelectedFields().findFirst(); - if (first.isPresent()) { - first.get().updateValue(o); - } + final Object o = JsonPath.read(source, key); + final Optional first = path.evaluate(rec).getSelectedFields().findFirst(); + first.ifPresent(fieldValue -> fieldValue.updateValue(o)); } catch (Exception ex) { throw new RuntimeException(ex); } }); - return _rec; + return rec; } @Override diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java index b833e79cb1..c408f5f3d5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/main/java/org/apache/nifi/elasticsearch/ElasticSearchStringLookupService.java @@ -87,7 +87,7 @@ public class ElasticSearchStringLookupService extends AbstractControllerService public Optional lookup(Map coordinates) throws LookupFailureException { try { final String id = (String) coordinates.get(ID); - final Map enums = esClient.get(index, type, id); + final Map enums = esClient.get(index, type, id, null); if (enums == null) { return Optional.empty(); } else { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy index 2fe9a6b601..6045ce4ff5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/SearchResponseTest.groovy @@ -34,6 +34,7 @@ class SearchResponseTest { def warnings = ["auth"] def response = new SearchResponse(results, aggs as Map, pitId, scrollId, searchAfter, num, took, timeout, warnings) def str = response.toString() + Assert.assertEquals(results, response.hits) Assert.assertEquals(aggs, response.aggregations) Assert.assertEquals(pitId, response.pitId) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy index 0ac8cd9561..9cb9885202 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchClientService_IT.groovy @@ -25,6 +25,7 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl import org.apache.nifi.elasticsearch.IndexOperationRequest import org.apache.nifi.elasticsearch.IndexOperationResponse import org.apache.nifi.elasticsearch.SearchResponse +import org.apache.nifi.elasticsearch.UpdateOperationResponse import org.apache.nifi.security.util.StandardTlsConfiguration import org.apache.nifi.security.util.TemporaryKeyStoreBuilder import org.apache.nifi.security.util.TlsConfiguration @@ -109,6 +110,7 @@ class ElasticSearchClientService_IT { runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000") runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000") runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue()) + try { runner.enableControllerService(service) } catch (Exception ex) { @@ -125,21 +127,21 @@ class ElasticSearchClientService_IT { @Test void testBasicSearch() throws Exception { String query = prettyPrint(toJson([ - size: 10, - query: [ - match_all: [:] - ], - aggs: [ - term_counts: [ - terms: [ - field: "msg", - size: 5 - ] + size: 10, + query: [ + match_all: [:] + ], + aggs: [ + term_counts: [ + terms: [ + field: "msg", + size: 5 + ] + ] ] - ] ])) - - + + SearchResponse response = service.search(query, INDEX, TYPE, null) Assert.assertNotNull("Response was null", response) @@ -158,11 +160,11 @@ class ElasticSearchClientService_IT { def buckets = termCounts.get("buckets") Assert.assertNotNull("Buckets branch was empty", buckets) def expected = [ - "one": 1, - "two": 2, - "three": 3, - "four": 4, - "five": 5 + "one": 1, + "two": 2, + "three": 3, + "four": 4, + "five": 5 ] buckets.each { aggRes -> @@ -172,6 +174,53 @@ class ElasticSearchClientService_IT { } } + @Test + void testBasicSearchRequestParameters() throws Exception { + String query = prettyPrint(toJson([ + size: 10, + query: [ + match_all: [:] + ], + aggs: [ + term_counts: [ + terms: [ + field: "msg", + size: 5 + ] + ] + ] + ])) + + + SearchResponse response = service.search(query, "messages", TYPE, [preference: "_local"]) + Assert.assertNotNull("Response was null", response) + + Assert.assertEquals("Wrong count", 15, response.numberOfHits) + Assert.assertFalse("Timed out", response.isTimedOut()) + Assert.assertNotNull("Hits was null", response.getHits()) + Assert.assertEquals("Wrong number of hits", 10, response.hits.size()) + Assert.assertNotNull("Aggregations are missing", response.aggregations) + Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size()) + + Map termCounts = response.aggregations.get("term_counts") as Map + Assert.assertNotNull("Term counts was missing", termCounts) + def buckets = termCounts.get("buckets") + Assert.assertNotNull("Buckets branch was empty", buckets) + def expected = [ + "one": 1, + "two": 2, + "three": 3, + "four": 4, + "five": 5 + ] + + buckets.each { aggRes -> + String key = aggRes["key"] + def docCount = aggRes["doc_count"] + Assert.assertEquals("${key} did not match.", expected[key], docCount) + } + } + @Test void testSearchWarnings() { String query @@ -361,13 +410,55 @@ class ElasticSearchClientService_IT { @Test void testDeleteByQuery() throws Exception { String query = prettyPrint(toJson([ - query: [ - match: [ - msg: "five" + query: [ + match: [ + msg: "five" + ] ] - ] ])) - DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE) + DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, null) + Assert.assertNotNull(response) + Assert.assertTrue(response.getTook() > 0) + } + + @Test + void testDeleteByQueryRequestParameters() throws Exception { + String query = prettyPrint(toJson([ + query: [ + match: [ + msg: "six" + ] + ] + ])) + DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, [refresh: "true"]) + Assert.assertNotNull(response) + Assert.assertTrue(response.getTook() > 0) + } + + @Test + void testUpdateByQuery() throws Exception { + String query = prettyPrint(toJson([ + query: [ + match: [ + msg: "four" + ] + ] + ])) + UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, null) + Assert.assertNotNull(response) + Assert.assertTrue(response.getTook() > 0) + } + + @Test + void testUpdateByQueryRequestParameters() throws Exception { + String query = prettyPrint(toJson([ + query: [ + match: [ + msg: "four" + ] + ] + ])) + UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, [refresh: "true", slices: "1"]) Assert.assertNotNull(response) Assert.assertTrue(response.getTook() > 0) } @@ -375,17 +466,17 @@ class ElasticSearchClientService_IT { @Test void testDeleteById() throws Exception { final String ID = "1" - final def originalDoc = service.get(INDEX, TYPE, ID) - DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID) + final def originalDoc = service.get(INDEX, TYPE, ID, null) + DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID, null) Assert.assertNotNull(response) Assert.assertTrue(response.getTook() > 0) - def doc = service.get(INDEX, TYPE, ID) + def doc = service.get(INDEX, TYPE, ID, null) Assert.assertNull(doc) - doc = service.get(INDEX, TYPE, "2") + doc = service.get(INDEX, TYPE, "2", null) Assert.assertNotNull(doc) // replace the deleted doc - service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index)) + service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index), null) waitForIndexRefresh() // (affects later tests using _search or _bulk) } @@ -394,7 +485,7 @@ class ElasticSearchClientService_IT { Map old 1.upto(15) { index -> String id = String.valueOf(index) - def doc = service.get(INDEX, TYPE, id) + def doc = service.get(INDEX, TYPE, id, null) Assert.assertNotNull("Doc was null", doc) Assert.assertNotNull("${doc.toString()}\t${doc.keySet().toString()}", doc.get("msg")) old = doc @@ -434,22 +525,22 @@ class ElasticSearchClientService_IT { // index with nulls suppressNulls(false) - IndexOperationResponse response = service.bulk([new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)]) + IndexOperationResponse response = service.bulk([new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)], null) Assert.assertNotNull(response) Assert.assertTrue(response.getTook() > 0) waitForIndexRefresh() - Map result = service.get("nulls", TYPE, "1") + Map result = service.get("nulls", TYPE, "1", null) Assert.assertEquals(doc, result) // suppress nulls suppressNulls(true) - response = service.bulk([new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)]) + response = service.bulk([new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)], null) Assert.assertNotNull(response) Assert.assertTrue(response.getTook() > 0) waitForIndexRefresh() - result = service.get("nulls", TYPE, "2") + result = service.get("nulls", TYPE, "2", null) Assert.assertTrue("Non-nulls (present): " + result.toString(), result.keySet().containsAll(["msg", "is_blank"])) Assert.assertFalse("is_null (should be omitted): " + result.toString(), result.keySet().contains("is_null")) Assert.assertFalse("is_empty (should be omitted): " + result.toString(), result.keySet().contains("is_empty")) @@ -479,7 +570,7 @@ class ElasticSearchClientService_IT { put("msg", "test") }}, IndexOperationRequest.Operation.Index)) } - IndexOperationResponse response = service.bulk(payload) + IndexOperationResponse response = service.bulk(payload, [refresh: "true"]) Assert.assertNotNull(response) Assert.assertTrue(response.getTook() > 0) waitForIndexRefresh() @@ -488,9 +579,9 @@ class ElasticSearchClientService_IT { * Now, check to ensure that both indexes got populated appropriately. */ String query = "{ \"query\": { \"match_all\": {}}}" - Long indexA = service.count(query, "bulk_a", TYPE) - Long indexB = service.count(query, "bulk_b", TYPE) - Long indexC = service.count(query, "bulk_c", TYPE) + Long indexA = service.count(query, "bulk_a", TYPE, null) + Long indexB = service.count(query, "bulk_b", TYPE, null) + Long indexC = service.count(query, "bulk_c", TYPE, null) Assert.assertNotNull(indexA) Assert.assertNotNull(indexB) @@ -500,7 +591,46 @@ class ElasticSearchClientService_IT { Assert.assertEquals(10, indexB.intValue()) Assert.assertEquals(5, indexC.intValue()) - Long total = service.count(query, "bulk_*", TYPE) + Long total = service.count(query, "bulk_*", TYPE, null) + Assert.assertNotNull(total) + Assert.assertEquals(25, total.intValue()) + } + + @Test + void testBulkRequestParameters() throws Exception { + List payload = new ArrayList<>() + for (int x = 0; x < 20; x++) { + String index = x % 2 == 0 ? "bulk_a": "bulk_b" + payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap(){{ + put("msg", "test") + }}, IndexOperationRequest.Operation.Index)) + } + for (int x = 0; x < 5; x++) { + payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap(){{ + put("msg", "test") + }}, IndexOperationRequest.Operation.Index)) + } + IndexOperationResponse response = service.bulk(payload, [refresh: "true"]) + Assert.assertNotNull(response) + Assert.assertTrue(response.getTook() > 0) + + /* + * Now, check to ensure that both indexes got populated and refreshed appropriately. + */ + String query = "{ \"query\": { \"match_all\": {}}}" + Long indexA = service.count(query, "bulk_a", TYPE, null) + Long indexB = service.count(query, "bulk_b", TYPE, null) + Long indexC = service.count(query, "bulk_c", TYPE, null) + + Assert.assertNotNull(indexA) + Assert.assertNotNull(indexB) + Assert.assertNotNull(indexC) + Assert.assertEquals(indexA, indexB) + Assert.assertEquals(10, indexA.intValue()) + Assert.assertEquals(10, indexB.intValue()) + Assert.assertEquals(5, indexC.intValue()) + + Long total = service.count(query, "bulk_*", TYPE, null) Assert.assertNotNull(total) Assert.assertEquals(25, total.intValue()) } @@ -510,8 +640,8 @@ class ElasticSearchClientService_IT { final String TEST_ID = "update-test" Map doc = new HashMap<>() doc.put("msg", "Buongiorno, mondo") - service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index)) - Map result = service.get(INDEX, TYPE, TEST_ID) + service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index), [refresh: "true"]) + Map result = service.get(INDEX, TYPE, TEST_ID, null) Assert.assertEquals("Not the same", doc, result) Map updates = new HashMap<>() @@ -520,8 +650,8 @@ class ElasticSearchClientService_IT { merged.putAll(updates) merged.putAll(doc) IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update) - service.add(request) - result = service.get(INDEX, TYPE, TEST_ID) + service.add(request, [refresh: "true"]) + result = service.get(INDEX, TYPE, TEST_ID, null) Assert.assertTrue(result.containsKey("from")) Assert.assertTrue(result.containsKey("msg")) Assert.assertEquals("Not the same after update.", merged, result) @@ -532,17 +662,17 @@ class ElasticSearchClientService_IT { upsertItems.put("upsert_2", 1) upsertItems.put("upsert_3", true) request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert) - service.add(request) - result = service.get(INDEX, TYPE, UPSERTED_ID) + service.add(request, [refresh: "true"]) + result = service.get(INDEX, TYPE, UPSERTED_ID, null) Assert.assertEquals(upsertItems, result) List deletes = new ArrayList<>() deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete)) deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete)) - Assert.assertFalse(service.bulk(deletes).hasErrors()) + Assert.assertFalse(service.bulk(deletes, [refresh: "true"]).hasErrors()) waitForIndexRefresh() // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk) - Assert.assertNull(service.get(INDEX, TYPE, TEST_ID)) - Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID)) + Assert.assertNull(service.get(INDEX, TYPE, TEST_ID, null)) + Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID, null)) } @Test @@ -552,7 +682,7 @@ class ElasticSearchClientService_IT { new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "two", intField: 1], IndexOperationRequest.Operation.Create), // already exists new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: "notaninteger"], IndexOperationRequest.Operation.Index) // can't parse int field ] - def response = service.bulk(ops) + def response = service.bulk(ops, [refresh: "true"]) assert response.hasErrors() assert response.items.findAll { def key = it.keySet().stream().findFirst().get() @@ -563,4 +693,4 @@ class ElasticSearchClientService_IT { private static void waitForIndexRefresh() { Thread.sleep(1000) } -} +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy index 27452a34ac..419ab757a9 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupServiceTest.groovy @@ -56,7 +56,7 @@ class ElasticSearchLookupServiceTest { void simpleLookupTest() throws Exception { def coordinates = ["_id": "12345" ] - Optional result = lookupService.lookup(coordinates) + Optional result = lookupService.lookup(coordinates) as Optional Assert.assertNotNull(result) Assert.assertTrue(result.isPresent()) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy index 2b6c0602d9..d92aed34fb 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/ElasticSearchLookupService_IT.groovy @@ -61,7 +61,6 @@ class ElasticSearchLookupService_IT { runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400") runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000") runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000") - runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000") runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service") runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service") runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service") diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy index 81d36abc4a..ffb3e39a3b 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/groovy/org/apache/nifi/elasticsearch/integration/TestElasticSearchClientService.groovy @@ -23,6 +23,7 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientService import org.apache.nifi.elasticsearch.IndexOperationRequest import org.apache.nifi.elasticsearch.IndexOperationResponse import org.apache.nifi.elasticsearch.SearchResponse +import org.apache.nifi.elasticsearch.UpdateOperationResponse class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService { Map data = [ @@ -33,37 +34,42 @@ class TestElasticSearchClientService extends AbstractControllerService implement ] @Override - IndexOperationResponse add(IndexOperationRequest operation) { + IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters) { return null } @Override - IndexOperationResponse bulk(List operations) { + IndexOperationResponse bulk(List operations, Map requestParameters) { return null } @Override - Long count(String query, String index, String type) { + Long count(String query, String index, String type, Map requestParameters) { return null } @Override - DeleteOperationResponse deleteById(String index, String type, String id) { + DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters) { return null } @Override - DeleteOperationResponse deleteById(String index, String type, List ids) { + DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters) { return null } @Override - DeleteOperationResponse deleteByQuery(String query, String index, String type) { + DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters) { return null } @Override - Map get(String index, String type, String id) { + UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters) { + return null + } + + @Override + Map get(String index, String type, String id, Map requestParameters) { return data } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/conf-5/jvm.options b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/conf-5/jvm.options new file mode 100644 index 0000000000..a1fae3beef --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/conf-5/jvm.options @@ -0,0 +1,127 @@ +## +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +## JVM configuration + +################################################################ +## IMPORTANT: JVM heap size +################################################################ +## +## You should always set the min and max JVM heap +## size to the same value. For example, to set +## the heap to 4 GB, set: +## +## -Xms4g +## -Xmx4g +## +## See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html +## for more information +## +################################################################ + +# Xms represents the initial size of total heap space +# Xmx represents the maximum size of total heap space + +-Xms2g +-Xmx2g + +################################################################ +## Expert settings +################################################################ +## +## All settings below this section are considered +## expert settings. Don't tamper with them unless +## you understand what you are doing +## +################################################################ + +## GC configuration +#-XX:+UseConcMarkSweepGC +#-XX:CMSInitiatingOccupancyFraction=75 +#-XX:+UseCMSInitiatingOccupancyOnly +-XX:+UseSerialGC + +## optimizations + +# pre-touch memory pages used by the JVM during initialization +-XX:+AlwaysPreTouch + +## basic + +# force the server VM (remove on 32-bit client JVMs) +-server + +# explicitly set the stack size (reduce to 320k on 32-bit client JVMs) +-Xss1m + +# set to headless, just in case +-Djava.awt.headless=true + +# ensure UTF-8 encoding by default (e.g. filenames) +-Dfile.encoding=UTF-8 + +# use our provided JNA always versus the system one +-Djna.nosys=true + +# use old-style file permissions on JDK9 +-Djdk.io.permissionsUseCanonicalPath=true + +# flags to configure Netty +-Dio.netty.noUnsafe=true +-Dio.netty.noKeySetOptimization=true +-Dio.netty.recycler.maxCapacityPerThread=0 + +# log4j 2 +-Dlog4j.shutdownHookEnabled=false +-Dlog4j2.disable.jmx=true +-Dlog4j.skipJansi=true + +## heap dumps + +# generate a heap dump when an allocation from the Java heap fails +# heap dumps are created in the working directory of the JVM +-XX:+HeapDumpOnOutOfMemoryError + +# specify an alternative path for heap dumps +# ensure the directory exists and has sufficient space +#-XX:HeapDumpPath=${heap.dump.path} + +## GC logging + +#-XX:+PrintGCDetails +#-XX:+PrintGCTimeStamps +#-XX:+PrintGCDateStamps +#-XX:+PrintClassHistogram +#-XX:+PrintTenuringDistribution +#-XX:+PrintGCApplicationStoppedTime + +# log GC status to a file with time stamps +# ensure the directory exists +#-Xloggc:${loggc} + +# By default, the GC log file will not rotate. +# By uncommenting the lines below, the GC log file +# will be rotated every 128MB at most 32 times. +#-XX:+UseGCLogFileRotation +#-XX:NumberOfGCLogFiles=32 +#-XX:GCLogFileSize=128M + +# Elasticsearch 5.0.0 will throw an exception on unquoted field names in JSON. +# If documents were already indexed with unquoted fields in a previous version +# of Elasticsearch, some operations may throw errors. +# +# WARNING: This option will be removed in Elasticsearch 6.0.0 and is provided +# only for migration purposes. +#-Delasticsearch.json.allow_unquoted_field_names=true diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script index bac18d49b9..429a74ddc9 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-5.script @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + #create mapping PUT:user_details/:{ "mappings":{"faketype":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}} PUT:messages/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}} @@ -21,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}} PUT:bulk_b/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}} PUT:bulk_c/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}} PUT:error_handler:{ "mappings": { "faketype": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}} + #add document PUT:messages/faketype/1:{ "msg":"one" } PUT:messages/faketype/2:{ "msg":"two" } @@ -42,4 +44,7 @@ PUT:user_details/faketype/1:{ "email": "john.smith@company.com", "phone": "123-4 PUT:user_details/faketype/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"} PUT:nested/faketype/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}} PUT:nested/faketype/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}} -PUT:nested/faketype/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}} \ No newline at end of file +PUT:nested/faketype/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}} + +# refresh all indices before testing +POST:_refresh:{} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script index 703ec18e2c..739c0f2cab 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-6.script @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + #create mapping PUT:user_details/:{ "mappings":{"_doc":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}} PUT:messages/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}} @@ -43,4 +44,7 @@ PUT:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-7 PUT:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"} PUT:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}} PUT:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}} -PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}} \ No newline at end of file +PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}} + +# refresh all indices before testing +POST:_refresh:{} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script index 0b240ebdba..4328390abc 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-client-service/src/test/resources/setup-7.script @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + #create mapping PUT:user_details/:{ "mappings":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}} PUT:messages/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}} @@ -21,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}} PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}} PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}} PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}} + #add document POST:messages/_doc/1:{ "msg":"one" } POST:messages/_doc/2:{ "msg":"two" } @@ -42,4 +44,7 @@ POST:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456- POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"} POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}} POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}} -POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}} \ No newline at end of file +POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}} + +# refresh all indices before testing +POST:_refresh:{} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java index dae3d4f11b..8609b0fffe 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchHttpProcessor.java @@ -129,6 +129,8 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic private final AtomicReference okHttpClientAtomicReference = new AtomicReference<>(); + final ObjectMapper mapper = new ObjectMapper(); + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) { return new PropertyDescriptor.Builder() @@ -278,7 +280,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic } protected JsonNode parseJsonResponse(InputStream in) throws IOException { - final ObjectMapper mapper = new ObjectMapper(); return mapper.readTree(in); } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 8c501c3d2e..5e9bdf3870 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.elasticsearch; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -143,8 +142,6 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { private static final Set relationships; private static final List propertyDescriptors; - private static final ObjectMapper mapper = new ObjectMapper(); - static { final Set _rels = new HashSet<>(); _rels.add(REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java new file mode 100644 index 0000000000..90bd461f4f --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearch.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.OperationResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public abstract class AbstractByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor { + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("If the \"by query\" operation fails, and a flowfile was read, it will be sent to this relationship.") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("If the \"by query\" operation succeeds, and a flowfile was read, it will be sent to this relationship.") + .build(); + + private static final Set relationships; + private static final List propertyDescriptors; + + private volatile ElasticSearchClientService clientService; + + static { + final Set rels = new HashSet<>(); + rels.add(REL_SUCCESS); + rels.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(rels); + + final List descriptors = new ArrayList<>(); + descriptors.add(QUERY); + descriptors.add(QUERY_ATTRIBUTE); + descriptors.add(INDEX); + descriptors.add(TYPE); + descriptors.add(CLIENT_SERVICE); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + abstract String getTookAttribute(); + + abstract String getErrorAttribute(); + + abstract OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, + final String index, final String type, + final Map requestParameters); + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile input = null; + if (context.hasIncomingConnection()) { + input = session.get(); + + if (input == null && context.hasNonLoopConnection()) { + return; + } + } + + try { + final String query = getQuery(input, context, session); + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue(); + final String type = context.getProperty(TYPE).isSet() + ? context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue() + : null; + final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet() + ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue() + : null; + + final OperationResponse or = performOperation(this.clientService, query, index, type, getUrlQueryParameters(context, input)); + + if (input == null) { + input = session.create(); + } + + final Map attrs = new HashMap<>(); + attrs.put(getTookAttribute(), String.valueOf(or.getTook())); + if (!StringUtils.isBlank(queryAttr)) { + attrs.put(queryAttr, query); + } + + input = session.putAllAttributes(input, attrs); + + session.transfer(input, REL_SUCCESS); + } catch (final Exception e) { + if (input != null) { + input = session.putAttribute(input, getErrorAttribute(), e.getMessage()); + session.transfer(input, REL_FAILURE); + } + getLogger().error("Error running \"by query\" operation: ", e); + context.yield(); + } + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java index 705fcd877a..7f1bb31767 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearch.java @@ -30,6 +30,7 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StringUtils; @@ -124,6 +125,17 @@ public abstract class AbstractJsonQueryElasticsearch(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class)); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java index b95587714a..e5b47eed54 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPaginatedJsonQueryElasticsearch.java @@ -141,14 +141,17 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs if (!newQuery && PAGINATION_SCROLL.getValue().equals(paginationType)) { response = clientService.get().scroll(queryJson); } else { + final Map requestParameters = getUrlQueryParameters(context, input); + if (PAGINATION_SCROLL.getValue().equals(paginationType)) { + requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive()); + } + response = clientService.get().search( queryJson, // Point in Time uses general /_search API not /index/_search PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) ? null : paginatedJsonQueryParameters.getIndex(), paginatedJsonQueryParameters.getType(), - PAGINATION_SCROLL.getValue().equals(paginationType) - ? Collections.singletonMap("scroll", paginatedJsonQueryParameters.getKeepAlive()) - : null + requestParameters ); paginatedJsonQueryParameters.setPitId(response.getPitId()); paginatedJsonQueryParameters.setSearchAfter(response.getSearchAfter()); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java index f1917781d3..18d5de820a 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteByQueryElasticsearch.java @@ -17,129 +17,49 @@ package org.apache.nifi.processors.elasticsearch; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.elasticsearch.DeleteOperationResponse; import org.apache.nifi.elasticsearch.ElasticSearchClientService; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.StringUtils; +import org.apache.nifi.elasticsearch.OperationResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Set; +@WritesAttributes({ + @WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."), + @WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by Elasticsearch if there is an error running the delete.") +}) @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@Tags({ "elastic", "elasticsearch", "delete", "query"}) @CapabilityDescription("Delete from an Elasticsearch index using a query. The query can be loaded from a flowfile body " + "or from the Query parameter.") -@Tags({ "elastic", "elasticsearch", "delete", "query"}) -@WritesAttributes({ - @WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."), - @WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by Elasticsearch if there is an error running the delete.") -}) -public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor { - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("If the delete by query fails, and a flowfile was read, it will be sent to this relationship.").build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") - .description("If the delete by query succeeds, and a flowfile was read, it will be sent to this relationship.") - .build(); - - +@DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body") +public class DeleteByQueryElasticsearch extends AbstractByQueryElasticsearch { static final String TOOK_ATTRIBUTE = "elasticsearch.delete.took"; static final String ERROR_ATTRIBUTE = "elasticsearch.delete.error"; - private static final Set relationships; - private static final List propertyDescriptors; - - private volatile ElasticSearchClientService clientService; - - static { - final Set _rels = new HashSet<>(); - _rels.add(REL_SUCCESS); - _rels.add(REL_FAILURE); - relationships = Collections.unmodifiableSet(_rels); - - final List descriptors = new ArrayList<>(); - descriptors.add(QUERY); - descriptors.add(QUERY_ATTRIBUTE); - descriptors.add(INDEX); - descriptors.add(TYPE); - descriptors.add(CLIENT_SERVICE); - - propertyDescriptors = Collections.unmodifiableList(descriptors); + @Override + String getTookAttribute() { + return TOOK_ATTRIBUTE; } @Override - public Set getRelationships() { - return relationships; + String getErrorAttribute() { + return ERROR_ATTRIBUTE; } @Override - public final List getSupportedPropertyDescriptors() { - return propertyDescriptors; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class); - } - - @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - FlowFile input = null; - if (context.hasIncomingConnection()) { - input = session.get(); - - if (input == null && context.hasNonLoopConnection()) { - return; - } - } - - try { - final String query = getQuery(input, context, session); - final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue(); - final String type = context.getProperty(TYPE).isSet() - ? context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue() - : null; - final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet() - ? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue() - : null; - DeleteOperationResponse dor = clientService.deleteByQuery(query, index, type); - - if (input == null) { - input = session.create(); - } - - Map attrs = new HashMap<>(); - attrs.put(TOOK_ATTRIBUTE, String.valueOf(dor.getTook())); - if (!StringUtils.isBlank(queryAttr)) { - attrs.put(queryAttr, query); - } - - input = session.putAllAttributes(input, attrs); - - session.transfer(input, REL_SUCCESS); - } catch (Exception e) { - if (input != null) { - input = session.putAttribute(input, ERROR_ATTRIBUTE, e.getMessage()); - session.transfer(input, REL_FAILURE); - } - getLogger().error("Error running delete by query: ", e); - context.yield(); - } + OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, + final String index, final String type, final Map requestParameters) { + return clientService.deleteByQuery(query, index, type, requestParameters); } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java index 75f1fc3fdf..886279ebdf 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/ElasticsearchRestProcessor.java @@ -30,6 +30,8 @@ import org.apache.nifi.processor.util.StandardValidators; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.Map; +import java.util.stream.Collectors; public interface ElasticsearchRestProcessor { String ATTR_RECORD_COUNT = "record.count"; @@ -121,4 +123,15 @@ public interface ElasticsearchRestProcessor { return retVal; } + + default Map getUrlQueryParameters(final ProcessContext context, final FlowFile flowFile) { + return context.getProperties().entrySet().stream() + // filter non-null dynamic properties + .filter(e -> e.getKey().isDynamic() && e.getValue() != null) + // convert to Map of URL parameter keys and values + .collect(Collectors.toMap( + e -> e.getKey().getName(), + e -> context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue() + )); + } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java index 6f350372e9..a460c3a19e 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/JsonQueryElasticsearch.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.elasticsearch; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -23,6 +24,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.elasticsearch.SearchResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -46,11 +48,16 @@ import java.util.concurrent.TimeUnit; "Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " + "processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " + "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.") +@DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body") public class JsonQueryElasticsearch extends AbstractJsonQueryElasticsearch { @Override JsonQueryParameters buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session) throws IOException { - final JsonQueryParameters jsonQueryParameters = new JsonQueryParameters(); populateCommonJsonQueryParameters(jsonQueryParameters, input, context, session); return jsonQueryParameters; @@ -64,7 +71,7 @@ public class JsonQueryElasticsearch extends AbstractJsonQueryElasticsearch ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList( + IndexOperationRequest.Operation.Create.getValue().toLowerCase(), + IndexOperationRequest.Operation.Delete.getValue().toLowerCase(), + IndexOperationRequest.Operation.Index.getValue().toLowerCase(), + IndexOperationRequest.Operation.Update.getValue().toLowerCase(), + IndexOperationRequest.Operation.Upsert.getValue().toLowerCase() + )); + + private RecordPathCache recordPathCache; + private RecordReaderFactory readerFactory; + private RecordSetWriterFactory writerFactory; + private boolean logErrors; + private ObjectMapper errorMapper; + + private volatile ElasticSearchClientService clientService; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + @Override public Set getRelationships() { return RELATIONSHIPS; @@ -254,17 +281,19 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic return DESCRIPTORS; } - private RecordReaderFactory readerFactory; - private RecordPathCache recordPathCache; - private ElasticSearchClientService clientService; - private RecordSetWriterFactory writerFactory; - private boolean logErrors; - private volatile String dateFormat; - private volatile String timeFormat; - private volatile String timestampFormat; + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .dynamic(true) + .build(); + } @OnScheduled - public void onScheduled(ProcessContext context) { + public void onScheduled(final ProcessContext context) { this.readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class); this.recordPathCache = new RecordPathCache(16); @@ -283,18 +312,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic if (this.timestampFormat == null) { this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); } + + if (errorMapper == null && (logErrors || getLogger().isDebugEnabled())) { + errorMapper = new ObjectMapper(); + errorMapper.enable(SerializationFeature.INDENT_OUTPUT); + } } - static final List ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList( - IndexOperationRequest.Operation.Create.getValue().toLowerCase(), - IndexOperationRequest.Operation.Delete.getValue().toLowerCase(), - IndexOperationRequest.Operation.Index.getValue().toLowerCase(), - IndexOperationRequest.Operation.Update.getValue().toLowerCase(), - IndexOperationRequest.Operation.Upsert.getValue().toLowerCase() - )); - @Override - protected Collection customValidate(ValidationContext validationContext) { + protected Collection customValidate(final ValidationContext validationContext) { final List validationResults = new ArrayList<>(); final PropertyValue indexOp = validationContext.getProperty(INDEX_OP); @@ -319,8 +345,8 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic } @Override - public void onTrigger(ProcessContext context, ProcessSession session) { - FlowFile input = session.get(); + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final FlowFile input = session.get(); if (input == null) { return; } @@ -336,25 +362,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic final String typePath = context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue(); final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(input).getValue(); - RecordPath ioPath = indexOpPath != null ? recordPathCache.getCompiled(indexOpPath) : null; - RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null; - RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null; - RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null; - RecordPath atPath = atTimestampPath != null ? recordPathCache.getCompiled(atTimestampPath) : null; + final RecordPath ioPath = indexOpPath != null ? recordPathCache.getCompiled(indexOpPath) : null; + final RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null; + final RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null; + final RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null; + final RecordPath atPath = atTimestampPath != null ? recordPathCache.getCompiled(atTimestampPath) : null; - boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean(); - boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean(); + final boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean(); + final boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean(); - int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger(); - List badRecords = new ArrayList<>(); + final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger(); + final List badRecords = new ArrayList<>(); try (final InputStream inStream = session.read(input); - final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) { - Record record; - List operationList = new ArrayList<>(); - List originals = new ArrayList<>(); + final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) { + final PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet()); + final List operationList = new ArrayList<>(); + final List originals = new ArrayList<>(); - while ((record = reader.nextRecord()) != null) { + Record record; + while ((record = recordSet.next()) != null) { final String idx = getFromRecordPath(record, iPath, index, false); final String t = getFromRecordPath(record, tPath, type, false); final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false)); @@ -369,9 +396,9 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o)); originals.add(record); - if (operationList.size() == batchSize) { - BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema()); - FlowFile bad = indexDocuments(bundle, session, input); + if (operationList.size() == batchSize || !recordSet.isAnotherRecord()) { + final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema()); + final FlowFile bad = indexDocuments(bundle, context, session, input); if (bad != null) { badRecords.add(bad); } @@ -382,22 +409,22 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic } if (!operationList.isEmpty()) { - BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema()); - FlowFile bad = indexDocuments(bundle, session, input); + final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema()); + final FlowFile bad = indexDocuments(bundle, context, session, input); if (bad != null) { badRecords.add(bad); } } - } catch (ElasticsearchError ese) { - String msg = String.format("Encountered a server-side problem with Elasticsearch. %s", + } catch (final ElasticsearchError ese) { + final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s", ese.isElastic() ? "Moving to retry." : "Moving to failure"); getLogger().error(msg, ese); - Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE; + final Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE; session.penalize(input); session.transfer(input, rel); removeBadRecordFlowFiles(badRecords, session); return; - } catch (Exception ex) { + } catch (final Exception ex) { getLogger().error("Could not index documents.", ex); session.transfer(input, REL_FAILURE); removeBadRecordFlowFiles(badRecords, session); @@ -406,22 +433,20 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic session.transfer(input, REL_SUCCESS); } - private void removeBadRecordFlowFiles(List bad, ProcessSession session) { - for (FlowFile badFlowFile : bad) { + private void removeBadRecordFlowFiles(final List bad, final ProcessSession session) { + for (final FlowFile badFlowFile : bad) { session.remove(badFlowFile); } bad.clear(); } - private FlowFile indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input) throws Exception { - IndexOperationResponse response = clientService.bulk(bundle.getOperationList()); + private FlowFile indexDocuments(final BulkOperation bundle, final ProcessContext context, final ProcessSession session, final FlowFile input) throws Exception { + final IndexOperationResponse response = clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context, input)); if (response.hasErrors()) { - if(logErrors || getLogger().isDebugEnabled()) { - List> errors = response.getItems(); - ObjectMapper mapper = new ObjectMapper(); - mapper.enable(SerializationFeature.INDENT_OUTPUT); - String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", mapper.writeValueAsString(errors)); + if (logErrors || getLogger().isDebugEnabled()) { + final List> errors = response.getItems(); + final String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", errorMapper.writeValueAsString(errors)); if (logErrors) { getLogger().error(output); @@ -434,16 +459,16 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic FlowFile errorFF = session.create(input); try { int added = 0; - try (OutputStream os = session.write(errorFF); - RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) { + try (final OutputStream os = session.write(errorFF); + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) { writer.beginRecordSet(); for (int index = 0; index < response.getItems().size(); index++) { - Map current = response.getItems().get(index); + final Map current = response.getItems().get(index); if (!current.isEmpty()) { - String key = current.keySet().stream().findFirst().orElse(null); + final String key = current.keySet().stream().findFirst().orElse(null); @SuppressWarnings("unchecked") - Map inner = (Map) current.get(key); + final Map inner = (Map) current.get(key); if (inner != null && inner.containsKey("error")) { writer.write(bundle.getOriginalRecords().get(index)); added++; @@ -458,7 +483,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic session.transfer(errorFF, REL_FAILED_RECORDS); return errorFF; - } catch (Exception ex) { + } catch (final Exception ex) { getLogger().error("", ex); session.remove(errorFF); throw ex; @@ -474,10 +499,10 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic return fallback; } - RecordPathResult result = path.evaluate(record); - Optional value = result.getSelectedFields().findFirst(); + final RecordPathResult result = path.evaluate(record); + final Optional value = result.getSelectedFields().findFirst(); if (value.isPresent() && value.get().getValue() != null) { - FieldValue fieldValue = value.get(); + final FieldValue fieldValue = value.get(); if (!fieldValue.getField().getDataType().getFieldType().equals(RecordFieldType.STRING) ) { throw new ProcessException( String.format("Field referenced by %s must be a string.", path.getPath()) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java index 40976c062b..073f779906 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/SearchElasticsearch.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.processors.elasticsearch; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.PrimaryNodeOnly; import org.apache.nifi.annotation.behavior.Stateful; @@ -65,6 +66,12 @@ import java.util.Set; "Search After/Point in Time queries must include a valid \"sort\" field. The processor will retrieve multiple pages of results " + "until either no more results are available or the Pagination Keep Alive expiration is reached, after which the query will " + "restart with the first page of results being retrieved.") +@DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body") @Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp) " + "is retained in between invocations of this processor until the Scroll/PiT has expired " + "(when the current time is later than the last query execution plus the Pagination Keep Alive interval).") diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java new file mode 100644 index 0000000000..ba98025e47 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearch.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.elasticsearch.ElasticSearchClientService; +import org.apache.nifi.elasticsearch.OperationResponse; +import org.apache.nifi.expression.ExpressionLanguageScope; + +import java.util.Map; + +@WritesAttributes({ + @WritesAttribute(attribute = "elasticsearch.update.took", description = "The amount of time that it took to complete the update operation in ms."), + @WritesAttribute(attribute = "elasticsearch.update.error", description = "The error message provided by Elasticsearch if there is an error running the update.") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@Tags({ "elastic", "elasticsearch", "update", "query"}) +@CapabilityDescription("Update documents in an Elasticsearch index using a query. The query can be loaded from a flowfile body " + + "or from the Query parameter.") +@DynamicProperty( + name = "The name of a URL query parameter to add", + value = "The value of the URL query parameter", + expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES, + description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " + + "These parameters will override any matching parameters in the query request body") +public class UpdateByQueryElasticsearch extends AbstractByQueryElasticsearch { + static final String TOOK_ATTRIBUTE = "elasticsearch.update.took"; + static final String ERROR_ATTRIBUTE = "elasticsearch.update.error"; + + @Override + String getTookAttribute() { + return TOOK_ATTRIBUTE; + } + + @Override + String getErrorAttribute() { + return ERROR_ATTRIBUTE; + } + + @Override + OperationResponse performOperation(final ElasticSearchClientService clientService, final String query, + final String index, final String type, final Map requestParameters) { + return clientService.updateByQuery(query, index, type, requestParameters); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index eafd0011ee..749ba53fec 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -17,4 +17,5 @@ org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch org.apache.nifi.processors.elasticsearch.SearchElasticsearch -org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord \ No newline at end of file +org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord +org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html index 0ac19b2aab..76e862e80d 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord/additionalDetails.html @@ -37,5 +37,48 @@ not too large for it to handle. When failures do occur, this processor is capable of attempting to write the records that failed to an output record writer so that only failed records can be processed downstream or replayed.

+

+ The index, operation and (optional) type fields are configured with default values that can be overridden using + record path operations that find an index or type value in the record set. + The ID and operation type (create, index, update, upsert or delete) can also be extracted in a similar fashion from + the record set. The following is an example of a document exercising all of these features: +

+
+    {
+        "metadata": {
+            "id": "12345",
+            "index": "test",
+            "type": "message",
+            "operation": "index"
+        },
+        "message": "Hello, world",
+        "from": "john.smith"
+    }
+
+
+    {
+        "metadata": {
+            "id": "12345",
+            "index": "test",
+            "type": "message",
+            "operation": "delete"
+        }
+    }
+
+

The record path operations below would extract the relevant data:

+
    +
  • /metadata/id
  • +
  • /metadata/index
  • +
  • metadata/type
  • +
  • metadata/operation
  • +
+

Valid values for "operation" are:

+
    +
  • create
  • +
  • delete
  • +
  • index
  • +
  • update
  • +
  • upsert
  • +
\ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch/additionalDetails.html b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch/additionalDetails.html new file mode 100644 index 0000000000..f747a30a4c --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/resources/docs/org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch/additionalDetails.html @@ -0,0 +1,48 @@ + + + + + + UpdateByQueryElasticsearch + + + +

This processor executes an update operation against one or more indices using the _update_by_query handler. The + query should be a valid Elasticsearch JSON DSL query (Lucene syntax is not supported). An optional Elasticsearch + script can be specified to execute against the matched documents. An example query with script:

+
+        {
+            "script": {
+                "source": "ctx._source.count++",
+                "lang": "painless"
+            },
+            "query": {
+                "match": {
+                    "username.keyword": "john.smith"
+                }
+            }
+        }
+    
+

To update all of the contents of an index, this could be used:

+
+        {
+            "query": {
+                "match_all": {}
+            }
+        }
+    
+ + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.groovy new file mode 100644 index 0000000000..33be374750 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractByQueryElasticsearchTest.groovy @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch + +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.TestRunner +import org.junit.Assert +import org.junit.Test + +abstract class AbstractByQueryElasticsearchTest { + private static final String INDEX = "test_idx" + private static final String TYPE = "test_type" + private static final String CLIENT_NAME = "clientService" + + private TestElasticsearchClientService client + + abstract String queryAttr() + + abstract String tookAttr() + + abstract String errorAttr() + + abstract TestRunner setupRunner() + + abstract void expectError(final TestElasticsearchClientService client) + + private void initClient(TestRunner runner) throws Exception { + client = new TestElasticsearchClientService(true) + runner.addControllerService(CLIENT_NAME, client) + runner.enableControllerService(client) + runner.setProperty(AbstractByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME) + } + + private void postTest(TestRunner runner, String queryParam) { + runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 0) + runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 1) + + final List flowFiles = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_SUCCESS) + final String attr = flowFiles.get(0).getAttribute(tookAttr()) + final String query = flowFiles.get(0).getAttribute(queryAttr()) + Assert.assertNotNull(attr) + Assert.assertEquals(attr, "100") + Assert.assertNotNull(query) + Assert.assertEquals(queryParam, query) + } + + @Test + void testWithFlowfileInput() throws Exception { + final String query = "{ \"query\": { \"match_all\": {} }}" + final TestRunner runner = setupRunner() + runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX) + runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE) + runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()) + initClient(runner) + runner.assertValid() + runner.enqueue(query) + runner.run() + + postTest(runner, query) + + Assert.assertTrue(client.getRequestParameters().isEmpty()) + } + + @Test + void testWithFlowfileInputAndRequestParameters() throws Exception { + final String query = "{ \"query\": { \"match_all\": {} }}" + final TestRunner runner = setupRunner() + runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX) + runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE) + runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()) + runner.setProperty("refresh", "true") + runner.setProperty("slices", '${slices}') + initClient(runner) + runner.assertValid() + runner.enqueue(query, [slices: "auto"]) + runner.run() + + postTest(runner, query) + + Assert.assertEquals(2, client.getRequestParameters().size()) + Assert.assertEquals("true", client.getRequestParameters().get("refresh")) + Assert.assertEquals("auto", client.getRequestParameters().get("slices")) + } + + @Test + void testWithQuery() throws Exception { + final String query = "{\n" + + "\t\"query\": {\n" + + "\t\t\"match\": {\n" + + "\t\t\t\"\${field.name}.keyword\": \"test\"\n" + + "\t\t}\n" + + "\t}\n" + + "}" + final Map attrs = new HashMap(){{ + put("field.name", "test_field") + }} + final TestRunner runner = setupRunner() + runner.setProperty(AbstractByQueryElasticsearch.QUERY, query) + runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX) + runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE) + runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()) + initClient(runner) + runner.assertValid() + runner.enqueue("", attrs) + runner.run() + + postTest(runner, query.replace('${field.name}', "test_field")) + + runner.clearTransferState() + + final String query2 = "{\n" + + "\t\"query\": {\n" + + "\t\t\"match\": {\n" + + "\t\t\t\"test_field.keyword\": \"test\"\n" + + "\t\t}\n" + + "\t}\n" + + "}" + runner.setProperty(AbstractByQueryElasticsearch.QUERY, query2) + runner.setIncomingConnection(false) + runner.assertValid() + runner.run() + postTest(runner, query2) + } + + @Test + void testErrorAttribute() throws Exception { + final String query = "{ \"query\": { \"match_all\": {} }}" + final TestRunner runner = setupRunner() + runner.setProperty(AbstractByQueryElasticsearch.QUERY, query) + runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX) + runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE) + runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()) + initClient(runner) + expectError(client) + runner.assertValid() + runner.enqueue("") + runner.run() + + runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 0) + runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 1) + + final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_FAILURE).get(0) + final String attr = mockFlowFile.getAttribute(errorAttr()) + Assert.assertNotNull(attr) + } + + @Test + void testInputHandling() { + final String query = "{ \"query\": { \"match_all\": {} }}" + final TestRunner runner = setupRunner() + runner.setProperty(AbstractByQueryElasticsearch.QUERY, query) + runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX) + runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE) + runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()) + initClient(runner) + runner.assertValid() + runner.run() + + runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 0) + runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 0) + } + + @Test + void testNoInputHandling() { + final String query = "{ \"query\": { \"match_all\": {} }}" + final TestRunner runner = setupRunner() + runner.setProperty(AbstractByQueryElasticsearch.QUERY, query) + runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX) + runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE) + runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()) + initClient(runner) + runner.setIncomingConnection(false) + runner.assertValid() + runner.run() + + postTest(runner, query) + + Assert.assertTrue(client.getRequestParameters().isEmpty()) + } + + @Test + void testNoInputHandlingWithRequestParameters() { + final String query = "{ \"query\": { \"match_all\": {} }}" + final TestRunner runner = setupRunner() + runner.setProperty(AbstractByQueryElasticsearch.QUERY, query) + runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX) + runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE) + runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr()) + runner.setProperty("refresh", "true") + runner.setProperty("slices", '${slices}') + runner.setVariable("slices", "auto") + initClient(runner) + runner.setIncomingConnection(false) + runner.assertValid() + runner.run() + + postTest(runner, query) + + Assert.assertEquals(2, client.getRequestParameters().size()) + Assert.assertEquals("true", client.getRequestParameters().get("refresh")) + Assert.assertEquals("auto", client.getRequestParameters().get("slices")) + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy index b114eb4783..51fdec15d8 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractJsonQueryElasticsearchTest.groovy @@ -29,8 +29,8 @@ import org.junit.Test import static groovy.json.JsonOutput.prettyPrint import static groovy.json.JsonOutput.toJson import static org.hamcrest.CoreMatchers.equalTo -import static org.hamcrest.MatcherAssert.assertThat import static org.hamcrest.CoreMatchers.is +import static org.hamcrest.MatcherAssert.assertThat import static org.junit.Assert.assertThrows abstract class AbstractJsonQueryElasticsearchTest

{ @@ -269,6 +269,27 @@ abstract class AbstractJsonQueryElasticsearchTest

flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS) - String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE) - String query = flowFiles.get(0).getAttribute(QUERY_ATTR) - Assert.assertNotNull(attr) - Assert.assertEquals(attr, "100") - Assert.assertNotNull(query) - Assert.assertEquals(queryParam, query) + @Override + String tookAttr() { + return DeleteByQueryElasticsearch.TOOK_ATTRIBUTE } - @Test - void testWithFlowfileInput() throws Exception { - String query = "{ \"query\": { \"match_all\": {} }}" - TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class) - runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX) - runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE) - runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR) - initClient(runner) - runner.assertValid() - runner.enqueue(query) - runner.run() - - postTest(runner, query) + @Override + String errorAttr() { + return DeleteByQueryElasticsearch.ERROR_ATTRIBUTE } - @Test - void testWithQuery() throws Exception { - String query = "{\n" + - "\t\"query\": {\n" + - "\t\t\"match\": {\n" + - "\t\t\t\"\${field.name}.keyword\": \"test\"\n" + - "\t\t}\n" + - "\t}\n" + - "}" - Map attrs = new HashMap(){{ - put("field.name", "test_field") - }} - TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class) - runner.setProperty(DeleteByQueryElasticsearch.QUERY, query) - runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX) - runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE) - runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR) - initClient(runner) - runner.assertValid() - runner.enqueue("", attrs) - runner.run() - - postTest(runner, query.replace('${field.name}', "test_field")) - - runner.clearTransferState() - - query = "{\n" + - "\t\"query\": {\n" + - "\t\t\"match\": {\n" + - "\t\t\t\"test_field.keyword\": \"test\"\n" + - "\t\t}\n" + - "\t}\n" + - "}" - runner.setProperty(DeleteByQueryElasticsearch.QUERY, query) - runner.setIncomingConnection(false) - runner.assertValid() - runner.run() - postTest(runner, query) + @Override + TestRunner setupRunner() { + return TestRunners.newTestRunner(DeleteByQueryElasticsearch.class) } - @Test - void testErrorAttribute() throws Exception { - String query = "{ \"query\": { \"match_all\": {} }}" - TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class) - runner.setProperty(DeleteByQueryElasticsearch.QUERY, query) - runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX) - runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE) - runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR) - initClient(runner) + @Override + void expectError(final TestElasticsearchClientService client) { client.setThrowErrorInDelete(true) - runner.assertValid() - runner.enqueue("") - runner.run() - - runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0) - runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1) - - MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0) - String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE) - Assert.assertNotNull(attr) - } - - @Test - void testInputHandling() { - String query = "{ \"query\": { \"match_all\": {} }}" - TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class) - runner.setProperty(DeleteByQueryElasticsearch.QUERY, query) - runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX) - runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE) - runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR) - initClient(runner) - runner.assertValid() - runner.run() } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy index 12df823dba..85a9f2ba60 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy @@ -125,7 +125,11 @@ class PutElasticsearchRecordTest { void basicTest(int failure, int retry, int success, Closure evalClosure) { clientService.evalClosure = evalClosure - runner.enqueue(flowFileContents, [ "schema.name": "simple" ]) + basicTest(failure, retry, success, [ "schema.name": "simple" ]) + } + + void basicTest(int failure, int retry, int success, Map attr) { + runner.enqueue(flowFileContents, attr) runner.run() runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure) @@ -135,6 +139,11 @@ class PutElasticsearchRecordTest { @Test void simpleTest() { + def evalParametersClosure = { Map params -> + Assert.assertTrue(params.isEmpty()) + } + clientService.evalParametersClosure = evalParametersClosure + basicTest(0, 0, 1) } @@ -149,6 +158,41 @@ class PutElasticsearchRecordTest { basicTest(0, 0, 1, evalClosure) } + @Test + void simpleTestWithRequestParameters() { + runner.setProperty("refresh", "true") + runner.setProperty("slices", '${slices}') + runner.setVariable("slices", "auto") + runner.assertValid() + + def evalParametersClosure = { Map params -> + Assert.assertEquals(2, params.size()) + Assert.assertEquals("true", params.get("refresh")) + Assert.assertEquals("auto", params.get("slices")) + } + + clientService.evalParametersClosure = evalParametersClosure + + basicTest(0, 0, 1) + } + + @Test + void simpleTestWithRequestParametersFlowFileEL() { + runner.setProperty("refresh", "true") + runner.setProperty("slices", '${slices}') + runner.assertValid() + + def evalParametersClosure = { Map params -> + Assert.assertEquals(2, params.size()) + Assert.assertEquals("true", params.get("refresh")) + Assert.assertEquals("auto", params.get("slices")) + } + + clientService.evalParametersClosure = evalParametersClosure + + basicTest(0, 0, 1, ["schema.name": "simple", slices: "auto"]) + } + @Test void simpleTestWithMockReader() { reader = new MockRecordParser() diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy index 68b146385f..c606d369d9 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/SearchElasticsearchTest.groovy @@ -27,8 +27,8 @@ import java.time.Instant import static groovy.json.JsonOutput.prettyPrint import static groovy.json.JsonOutput.toJson -import static org.hamcrest.MatcherAssert.assertThat import static org.hamcrest.CoreMatchers.is +import static org.hamcrest.MatcherAssert.assertThat import static org.junit.Assert.fail class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTest { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy index e4e5c7e409..9bd4cb6302 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/TestElasticsearchClientService.groovy @@ -24,63 +24,78 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientService import org.apache.nifi.elasticsearch.IndexOperationRequest import org.apache.nifi.elasticsearch.IndexOperationResponse import org.apache.nifi.elasticsearch.SearchResponse +import org.apache.nifi.elasticsearch.UpdateOperationResponse class TestElasticsearchClientService extends AbstractControllerService implements ElasticSearchClientService { private boolean returnAggs private boolean throwErrorInSearch private boolean throwErrorInDelete private boolean throwErrorInPit + private boolean throwErrorInUpdate private int pageCount = 0 private int maxPages = 1 private String query + private Map requestParameters TestElasticsearchClientService(boolean returnAggs) { this.returnAggs = returnAggs } - @Override - IndexOperationResponse add(IndexOperationRequest operation) { - return bulk(Arrays.asList(operation) as List) + private void common(boolean throwError, Map requestParameters) { + if (throwError) { + throw new IOException("Simulated IOException") + } + this.requestParameters = requestParameters } @Override - IndexOperationResponse bulk(List operations) { + IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters) { + return bulk(Arrays.asList(operation) as List, requestParameters) + } + + @Override + IndexOperationResponse bulk(List operations, Map requestParameters) { + common(false, requestParameters) return new IndexOperationResponse(100L) } @Override - Long count(String query, String index, String type) { + Long count(String query, String index, String type, Map requestParameters) { + common(false, requestParameters) return null } @Override - DeleteOperationResponse deleteById(String index, String type, String id) { - return deleteById(index, type, Arrays.asList(id)) + DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters) { + return deleteById(index, type, Arrays.asList(id), requestParameters) } @Override - DeleteOperationResponse deleteById(String index, String type, List ids) { - if (throwErrorInDelete) { - throw new IOException("Simulated IOException") - } + DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters) { + common(throwErrorInDelete, requestParameters) return new DeleteOperationResponse(100L) } @Override - DeleteOperationResponse deleteByQuery(String query, String index, String type) { - return deleteById(index, type, Arrays.asList("1")) + DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters) { + return deleteById(index, type, Arrays.asList("1"), requestParameters) } @Override - Map get(String index, String type, String id) { + UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters) { + common(throwErrorInUpdate, requestParameters) + return new UpdateOperationResponse(100L) + } + + @Override + Map get(String index, String type, String id, Map requestParameters) { + common(false, requestParameters) return [ "msg": "one" ] } @Override SearchResponse search(String query, String index, String type, Map requestParameters) { - if (throwErrorInSearch) { - throw new IOException("Simulated IOException") - } + common(throwErrorInSearch, requestParameters) this.query = query final SearchResponse response @@ -101,7 +116,7 @@ class TestElasticsearchClientService extends AbstractControllerService implement throw new IOException("Simulated IOException - scroll") } - return search(null, null, null, null) + return search(null, null, null, requestParameters) } @Override @@ -295,6 +310,10 @@ class TestElasticsearchClientService extends AbstractControllerService implement this.throwErrorInPit = throwErrorInPit } + void setThrowErrorInUpdate(boolean throwErrorInUpdate) { + this.throwErrorInUpdate = throwErrorInUpdate + } + void resetPageCount() { this.pageCount = 0 } @@ -302,4 +321,8 @@ class TestElasticsearchClientService extends AbstractControllerService implement void setMaxPages(int maxPages) { this.maxPages = maxPages } + + Map getRequestParameters() { + return this.requestParameters + } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearchTest.groovy new file mode 100644 index 0000000000..405d3c50a2 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/UpdateByQueryElasticsearchTest.groovy @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License") you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.elasticsearch + +import org.apache.nifi.util.TestRunner +import org.apache.nifi.util.TestRunners + +class UpdateByQueryElasticsearchTest extends AbstractByQueryElasticsearchTest { + @Override + String queryAttr() { + return "es.update.query" + } + + @Override + String tookAttr() { + return UpdateByQueryElasticsearch.TOOK_ATTRIBUTE + } + + @Override + String errorAttr() { + return UpdateByQueryElasticsearch.ERROR_ATTRIBUTE + } + + @Override + TestRunner setupRunner() { + return TestRunners.newTestRunner(UpdateByQueryElasticsearch.class) + } + + @Override + void expectError(final TestElasticsearchClientService client) { + client.setThrowErrorInUpdate(true) + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy index 33c62b26c6..e590419800 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/AbstractMockElasticsearchClient.groovy @@ -23,43 +23,49 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientService import org.apache.nifi.elasticsearch.IndexOperationRequest import org.apache.nifi.elasticsearch.IndexOperationResponse import org.apache.nifi.elasticsearch.SearchResponse +import org.apache.nifi.elasticsearch.UpdateOperationResponse class AbstractMockElasticsearchClient extends AbstractControllerService implements ElasticSearchClientService { boolean throwRetriableError boolean throwFatalError @Override - IndexOperationResponse add(IndexOperationRequest operation) { + IndexOperationResponse add(IndexOperationRequest operation, Map requestParameters) { return null } @Override - IndexOperationResponse bulk(List operations) { + IndexOperationResponse bulk(List operations, Map requestParameters) { return null } @Override - Long count(String query, String index, String type) { + Long count(String query, String index, String type, Map requestParameters) { return null } @Override - DeleteOperationResponse deleteById(String index, String type, String id) { + DeleteOperationResponse deleteById(String index, String type, String id, Map requestParameters) { return null } @Override - DeleteOperationResponse deleteById(String index, String type, List ids) { + DeleteOperationResponse deleteById(String index, String type, List ids, Map requestParameters) { return null } @Override - DeleteOperationResponse deleteByQuery(String query, String index, String type) { + DeleteOperationResponse deleteByQuery(String query, String index, String type, Map requestParameters) { return null } @Override - Map get(String index, String type, String id) { + UpdateOperationResponse updateByQuery(String query, String index, String type, Map requestParameters) { + return null + } + + @Override + Map get(String index, String type, String id, Map requestParameters) { return null } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy index 3c4d9022e0..21a3a8ff14 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/mock/MockBulkLoadClientService.groovy @@ -24,9 +24,10 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse class MockBulkLoadClientService extends AbstractMockElasticsearchClient { IndexOperationResponse response Closure evalClosure + Closure evalParametersClosure @Override - IndexOperationResponse bulk(List items) { + IndexOperationResponse bulk(List items, Map requestParameters) { if (throwRetriableError) { throw new MockElasticsearchError(true) } else if (throwFatalError) { @@ -37,6 +38,10 @@ class MockBulkLoadClientService extends AbstractMockElasticsearchClient { evalClosure.call(items) } + if (evalParametersClosure) { + evalParametersClosure.call(requestParameters) + } + response }