- NIFI-8001: Dynamic Properties as Request Parameters for Elasticsearch Client Service processors

- NIFI-8003: UpdateByQueryElasticsearchProcessor
- Addressed various warnings and inefficiencies found in existing processor code

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #4693.
This commit is contained in:
Chris Sampson 2021-10-07 11:17:28 +01:00 committed by Joe Gresock
parent 8d0eec2d62
commit f50c002692
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
44 changed files with 1530 additions and 586 deletions

View File

@ -17,13 +17,14 @@
package org.apache.nifi.elasticsearch;
public class DeleteOperationResponse {
public class DeleteOperationResponse implements OperationResponse {
private final long took;
public DeleteOperationResponse(final long took) {
this.took = took;
}
@Override
public long getTook() {
return took;
}

View File

@ -26,7 +26,6 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -83,6 +82,11 @@ public interface ElasticSearchClientService extends ControllerService {
.defaultValue("60000")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
/**
* @deprecated this setting is no longer used and will be removed in a future version.
* Property retained for now to prevent existing Flows with this processor from breaking upon upgrade.
*/
@Deprecated
PropertyDescriptor RETRY_TIMEOUT = new PropertyDescriptor.Builder()
.name("el-cs-retry-timeout")
.displayName("Retry timeout")
@ -119,71 +123,85 @@ public interface ElasticSearchClientService extends ControllerService {
* Index a document.
*
* @param operation A document to index.
* @param requestParameters A collection of URL request parameters. Optional.
* @return IndexOperationResponse if successful
* @throws IOException thrown when there is an error.
*/
IndexOperationResponse add(IndexOperationRequest operation);
IndexOperationResponse add(IndexOperationRequest operation, Map<String, String> requestParameters);
/**
* Bulk process multiple documents.
*
* @param operations A list of index operations.
* @param requestParameters A collection of URL request parameters. Optional.
* @return IndexOperationResponse if successful.
* @throws IOException thrown when there is an error.
*/
IndexOperationResponse bulk(List<IndexOperationRequest> operations);
IndexOperationResponse bulk(List<IndexOperationRequest> operations, Map<String, String> requestParameters);
/**
* Count the documents that match the criteria.
*
* @param query A query in the JSON DSL syntax
* @param index The index to target.
* @param type The type to target.
* @return
* @param type The type to target. Will not be used in future versions of Elasticsearch.
* @param requestParameters A collection of URL request parameters. Optional.
* @return number of documents matching the query
*/
Long count(String query, String index, String type);
Long count(String query, String index, String type, Map<String, String> requestParameters);
/**
* Delete a document by its ID from an index.
*
* @param index The index to target.
* @param type The type to target. Optional.
* @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
* @param id The document ID to remove from the selected index.
* @param requestParameters A collection of URL request parameters. Optional.
* @return A DeleteOperationResponse object if successful.
*/
DeleteOperationResponse deleteById(String index, String type, String id);
DeleteOperationResponse deleteById(String index, String type, String id, Map<String, String> requestParameters);
/**
* Delete multiple documents by ID from an index.
* @param index The index to target.
* @param type The type to target. Optional.
* @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
* @param ids A list of document IDs to remove from the selected index.
* @param requestParameters A collection of URL request parameters. Optional.
* @return A DeleteOperationResponse object if successful.
* @throws IOException thrown when there is an error.
*/
DeleteOperationResponse deleteById(String index, String type, List<String> ids);
DeleteOperationResponse deleteById(String index, String type, List<String> ids, Map<String, String> requestParameters);
/**
* Delete documents by query.
*
* @param query A valid JSON query to be used for finding documents to delete.
* @param index The index to target.
* @param type The type to target within the index. Optional.
* @param type The type to target within the index. Optional. Will not be used in future versions of Elasticsearch.
* @param requestParameters A collection of URL request parameters. Optional.
* @return A DeleteOperationResponse object if successful.
*/
DeleteOperationResponse deleteByQuery(String query, String index, String type);
DeleteOperationResponse deleteByQuery(String query, String index, String type, Map<String, String> requestParameters);
/**
* Update documents by query.
*
* @param query A valid JSON query to be used for finding documents to update.
* @param index The index to target.
* @param type The type to target within the index. Optional. Will not be used in future versions of Elasticsearch.
* @param requestParameters A collection of URL request parameters. Optional.
* @return An UpdateOperationResponse object if successful.
*/
UpdateOperationResponse updateByQuery(String query, String index, String type, Map<String, String> requestParameters);
/**
* Get a document by ID.
*
* @param index The index that holds the document.
* @param type The document type. Optional.
* @param type The document type. Optional. Will not be used in future versions of Elasticsearch.
* @param id The document ID
* @param requestParameters A collection of URL request parameters. Optional.
* @return Map if successful, null if not found.
* @throws IOException thrown when there is an error.
*/
Map<String, Object> get(String index, String type, String id);
Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters);
/**
* Perform a search using the JSON DSL.

View File

@ -21,8 +21,9 @@ import java.util.Arrays;
import java.util.Map;
/**
* A POJO that represents an "operation on an index." It should not be confused with just indexing documents, as it
* A POJO that represents an "operation on an index". It should not be confused with just indexing documents, as it
* covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
* Type is optional and will not be used in future versions of Elasticsearch.
*/
public class IndexOperationRequest {
private final String index;

View File

@ -23,7 +23,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
public class IndexOperationResponse {
public class IndexOperationResponse implements OperationResponse {
private final long took;
private boolean hasErrors;
private List<Map<String, Object>> items;
@ -34,6 +34,7 @@ public class IndexOperationResponse {
this.took = took;
}
@Override
public long getTook() {
return took;
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.elasticsearch;
public interface OperationResponse {
/**
* @return the time in millisecond taken to complete the Elasticsearch operation
*/
long getTook();
}

View File

@ -20,7 +20,7 @@ package org.apache.nifi.elasticsearch;
import java.util.List;
import java.util.Map;
public class SearchResponse {
public class SearchResponse implements OperationResponse {
private final List<Map<String, Object>> hits;
private final Map<String, Object> aggregations;
private final long numberOfHits;
@ -73,7 +73,8 @@ public class SearchResponse {
return timedOut;
}
public int getTook() {
@Override
public long getTook() {
return took;
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.elasticsearch;
public class UpdateOperationResponse implements OperationResponse {
private final long took;
public UpdateOperationResponse(long took) {
this.took = took;
}
@Override
public long getTook() {
return took;
}
}

View File

@ -30,6 +30,7 @@
<es.int.version>5.6.16</es.int.version>
<es.int.script.name>setup-5.script</es.int.script.name>
<es.int.type.name>faketype</es.int.type.name>
<es.int.path.conf>src/test/resources/conf-5/</es.int.path.conf>
<es.int.clusterName>testCluster</es.int.clusterName>
<es.int.transportPort>9500</es.int.transportPort>
<es.int.httpPort>9400</es.int.httpPort>
@ -41,14 +42,14 @@
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-lookup-service-api</artifactId>
<scope>provided</scope>
<version>1.15.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
<version>1.15.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -67,6 +68,35 @@
<version>1.15.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-client-service-api</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
@ -77,7 +107,7 @@
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.10.0</version>
<version>2.11.0</version>
</dependency>
<dependency>
@ -88,27 +118,44 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<!-- stopped at 7.13.4 as it is the last version prior to Elastic forcing the client to check it is connecting
to an Elastic-provided Elasticsearch instead of an instance provided by someone else (e.g. AWS OpenSearch)
see: https://opensearch.org/blog/community/2021/08/community-clients/ for more info.
Note: the low-level elasticsearch-rest-client remains licensed with Apache 2.0 even after the move
of the main Elasticsearch product and elasticsearch-rest-high-level-client to Elastic 2.0/SSPL 1.0 in v7.11.0+ -->
<version>7.10.2</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.vividsolutions</groupId>
<artifactId>jts</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-web-test-utils</artifactId>
@ -123,54 +170,9 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-elasticsearch-client-service-api</artifactId>
<artifactId>nifi-mock</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.6.16</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>5.6.16</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>com.vividsolutions</groupId>
<artifactId>jts</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.15.0-SNAPSHOT</version>
<scope>compile</scope>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
@ -189,31 +191,33 @@
<version>${nifi.groovy.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>5.6.16</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M3</version>
</plugin>
<plugin>
<groupId>com.github.alexcojocaru</groupId>
<artifactId>elasticsearch-maven-plugin</artifactId>
<version>6.19</version>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>integration-tests</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<!-- use 3.0.0-M3 due to a classpath/class loader issue in -M5, expected to be fixed in M6+ -->
<version>3.0.0-M3</version>
</plugin>
<plugin>
<groupId>com.github.alexcojocaru</groupId>
<artifactId>elasticsearch-maven-plugin</artifactId>
<version>6.19</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</profile>
<profile>
<!-- use with elasticsearch-oss only (-default can be used if x-pack-ml permissions fixed) -->
<id>elasticsearch-6</id>
@ -221,9 +225,10 @@
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<es.int.version>6.8.13</es.int.version>
<es.int.version>6.8.19</es.int.version>
<es.int.type.name>_doc</es.int.type.name>
<es.int.script.name>setup-6.script</es.int.script.name>
<es.int.path.conf/>
</properties>
</profile>
<profile>
@ -236,6 +241,7 @@
<es.int.version>7.10.2</es.int.version>
<es.int.script.name>setup-7.script</es.int.script.name>
<es.int.type.name/>
<es.int.path.conf/>
</properties>
</profile>
@ -270,6 +276,7 @@
<logLevel>${es.int.logLevel}</logLevel>
<pathInitScript>${project.basedir}/src/test/resources/${es.int.script.name}</pathInitScript>
<keepExistingData>false</keepExistingData>
<pathConf>${es.int.path.conf}</pathConf>
</configuration>
<executions>
<execution>

View File

@ -29,7 +29,6 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -41,6 +40,7 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
@ -58,6 +58,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -73,7 +74,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private Charset responseCharset;
static {
List<PropertyDescriptor> props = new ArrayList<>();
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ElasticSearchClientService.HTTP_HOSTS);
props.add(ElasticSearchClientService.USERNAME);
props.add(ElasticSearchClientService.PASSWORD);
@ -104,7 +105,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
}
} catch (Exception ex) {
} catch (final Exception ex) {
getLogger().error("Could not initialize ElasticSearch client.", ex);
throw new InitializationException(ex);
}
@ -118,7 +119,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
private void setupClient(final ConfigurationContext context) throws MalformedURLException, InitializationException {
final String hosts = context.getProperty(HTTP_HOSTS).evaluateAttributeExpressions().getValue();
String[] hostsSplit = hosts.split(",[\\s]*");
final String[] hostsSplit = hosts.split(",[\\s]*");
this.url = hostsSplit[0];
final SSLContextService sslService =
context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
@ -127,7 +128,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final Integer connectTimeout = context.getProperty(CONNECT_TIMEOUT).asInteger();
final Integer readTimeout = context.getProperty(SOCKET_TIMEOUT).asInteger();
final Integer retryTimeout = context.getProperty(RETRY_TIMEOUT).asInteger();
final HttpHost[] hh = new HttpHost[hostsSplit.length];
for (int x = 0; x < hh.length; x++) {
@ -139,7 +139,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
try {
sslContext = (sslService != null && (sslService.isKeyStoreConfigured() || sslService.isTrustStoreConfigured()))
? sslService.createContext() : null;
} catch (Exception e) {
} catch (final Exception e) {
getLogger().error("Error building up SSL Context from the supplied configuration.", e);
throw new InitializationException(e);
}
@ -163,8 +163,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
requestConfigBuilder.setConnectTimeout(connectTimeout);
requestConfigBuilder.setSocketTimeout(readTimeout);
return requestConfigBuilder;
})
.setMaxRetryTimeoutMillis(retryTimeout);
});
this.client = builder.build();
}
@ -182,7 +181,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
try {
return client.performRequest("POST", sb.toString(), requestParameters != null ? requestParameters : Collections.emptyMap(), queryEntity);
return performRequest("POST", sb.toString(), requestParameters, queryEntity);
} catch (final Exception e) {
throw new ElasticsearchError(e);
}
@ -197,7 +196,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final InputStream inputStream = response.getEntity().getContent();
final byte[] result = IOUtils.toByteArray(inputStream);
inputStream.close();
return (Map<String, Object>) mapper.readValue(new String(result, responseCharset), Map.class);
return mapper.readValue(new String(result, responseCharset), Map.class);
} else {
final String errorMessage = String.format("ElasticSearch reported an error while trying to run the query: %s",
response.getStatusLine().getReasonPhrase());
@ -217,8 +216,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
@Override
public IndexOperationResponse add(final IndexOperationRequest operation) {
return bulk(Collections.singletonList(operation));
public IndexOperationResponse add(final IndexOperationRequest operation, final Map<String, String> requestParameters) {
return bulk(Collections.singletonList(operation), requestParameters);
}
private String flatten(final String str) {
@ -277,7 +276,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
@Override
public IndexOperationResponse bulk(final List<IndexOperationRequest> operations) {
public IndexOperationResponse bulk(final List<IndexOperationRequest> operations, final Map<String, String> requestParameters) {
try {
final StringBuilder payload = new StringBuilder();
for (final IndexOperationRequest or : operations) {
@ -290,7 +289,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON);
final StopWatch watch = new StopWatch();
watch.start();
final Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
final Response response = performRequest("POST", "_bulk", requestParameters, entity);
watch.stop();
final String rawResponse = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
@ -307,20 +306,20 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
@Override
public Long count(final String query, final String index, final String type) {
final Response response = runQuery("_count", query, index, type, null);
public Long count(final String query, final String index, final String type, final Map<String, String> requestParameters) {
final Response response = runQuery("_count", query, index, type, requestParameters);
final Map<String, Object> parsed = parseResponse(response);
return ((Integer)parsed.get("count")).longValue();
}
@Override
public DeleteOperationResponse deleteById(final String index, final String type, final String id) {
return deleteById(index, type, Collections.singletonList(id));
public DeleteOperationResponse deleteById(final String index, final String type, final String id, final Map<String, String> requestParameters) {
return deleteById(index, type, Collections.singletonList(id), requestParameters);
}
@Override
public DeleteOperationResponse deleteById(final String index, final String type, final List<String> ids) {
public DeleteOperationResponse deleteById(final String index, final String type, final List<String> ids, final Map<String, String> requestParameters) {
try {
final StringBuilder sb = new StringBuilder();
for (final String id : ids) {
@ -330,7 +329,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
final StopWatch watch = new StopWatch();
watch.start();
final Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
final Response response = performRequest("POST", "_bulk", requestParameters, entity);
watch.stop();
if (getLogger().isDebugEnabled()) {
@ -346,10 +345,10 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
}
@Override
public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type) {
public DeleteOperationResponse deleteByQuery(final String query, final String index, final String type, final Map<String, String> requestParameters) {
final StopWatch watch = new StopWatch();
watch.start();
final Response response = runQuery("_delete_by_query", query, index, type, null);
final Response response = runQuery("_delete_by_query", query, index, type, requestParameters);
watch.stop();
// check for errors in response
@ -359,9 +358,21 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
return new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
}
public UpdateOperationResponse updateByQuery(final String query, final String index, final String type, final Map<String, String> requestParameters) {
final long start = System.currentTimeMillis();
final Response response = runQuery("_update_by_query", query, index, type, requestParameters);
final long end = System.currentTimeMillis();
// check for errors in response
parseResponse(response);
return new UpdateOperationResponse(end - start);
}
@SuppressWarnings("unchecked")
@Override
public Map<String, Object> get(final String index, final String type, final String id) {
public Map<String, Object> get(final String index, final String type, final String id, final Map<String, String> requestParameters) {
try {
final StringBuilder endpoint = new StringBuilder();
endpoint.append(index);
@ -371,8 +382,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
endpoint.append("/_doc");
}
endpoint.append("/").append(id);
final Response response = client.performRequest("GET", endpoint.toString(), new BasicHeader("Content-Type", "application/json"));
final Response response = performRequest("GET", endpoint.toString(), requestParameters, null);
final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
parseResponseWarningHeaders(response);
@ -402,7 +413,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
try {
final Response response = runQuery("_search", query, index, type, requestParameters);
return buildSearchResponse(response);
} catch (Exception ex) {
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
}
@ -411,10 +422,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
public SearchResponse scroll(final String scroll) {
try {
final HttpEntity scrollEntity = new NStringEntity(scroll, ContentType.APPLICATION_JSON);
final Response response = client.performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
final Response response = performRequest("POST", "/_search/scroll", Collections.emptyMap(), scrollEntity);
return buildSearchResponse(response);
} catch (Exception ex) {
} catch (final Exception ex) {
throw new RuntimeException(ex);
}
}
@ -427,7 +437,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
put("keep_alive", keepAlive);
}
}};
final Response response = client.performRequest("POST", index + "/_pit", params);
final Response response = performRequest("POST", index + "/_pit", params, null);
final String body = IOUtils.toString(response.getEntity().getContent(), StandardCharsets.UTF_8);
parseResponseWarningHeaders(response);
@ -447,7 +457,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final HttpEntity pitEntity = new NStringEntity(String.format("{\"id\": \"%s\"}", pitId), ContentType.APPLICATION_JSON);
final StopWatch watch = new StopWatch(true);
final Response response = client.performRequest("DELETE", "/_pit", Collections.emptyMap(), pitEntity);
final Response response = performRequest("DELETE", "/_pit", Collections.emptyMap(), pitEntity);
watch.stop();
if (getLogger().isDebugEnabled()) {
@ -476,7 +486,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final HttpEntity scrollBody = new NStringEntity(String.format("{\"scroll_id\": \"%s\"}", scrollId), ContentType.APPLICATION_JSON);
final StopWatch watch = new StopWatch(true);
final Response response = client.performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), scrollBody);
final Response response = performRequest("DELETE", "/_search/scroll", Collections.emptyMap(), scrollBody);
watch.stop();
if (getLogger().isDebugEnabled()) {
@ -519,15 +529,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
if (getLogger().isDebugEnabled()) {
final String searchSummary = "******************" +
String.format("Took: %d", took) +
String.format("Timed out: %s", timedOut) +
String.format("Aggregation count: %d", aggregations.size()) +
String.format("Hit count: %d", hits.size()) +
String.format("PIT Id: %s", pitId) +
String.format("Scroll Id: %s", scrollId) +
String.format("Search After: %s", searchAfter) +
String.format("Total found: %d", count) +
String.format("Warnings: %s", warnings) +
String.format(Locale.getDefault(), "Took: %d", took) +
String.format(Locale.getDefault(), "Timed out: %s", timedOut) +
String.format(Locale.getDefault(), "Aggregation count: %d", aggregations.size()) +
String.format(Locale.getDefault(), "Hit count: %d", hits.size()) +
String.format(Locale.getDefault(), "PIT Id: %s", pitId) +
String.format(Locale.getDefault(), "Scroll Id: %s", scrollId) +
String.format(Locale.getDefault(), "Search After: %s", searchAfter) +
String.format(Locale.getDefault(), "Total found: %d", count) +
String.format(Locale.getDefault(), "Warnings: %s", warnings) +
"******************";
getLogger().debug(searchSummary);
}
@ -554,4 +564,15 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
(StringUtils.isNotBlank(type) ? "/" : "") +
(StringUtils.isNotBlank(type) ? type : "");
}
private Response performRequest(final String method, final String endpoint, final Map<String, String> parameters, final HttpEntity entity) throws IOException {
final Request request = new Request(method, endpoint);
if (parameters != null && !parameters.isEmpty()) {
request.addParameters(parameters);
}
if (entity != null) {
request.setEntity(entity);
}
return client.performRequest(request);
}
}

View File

@ -83,19 +83,18 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
private String type;
private ObjectMapper mapper;
private final List<PropertyDescriptor> DESCRIPTORS;
private volatile ConcurrentHashMap<String, RecordPath> recordPathMappings;
private final List<PropertyDescriptor> descriptors;
public ElasticSearchLookupService() {
List<PropertyDescriptor> _desc = new ArrayList<>();
_desc.addAll(super.getSupportedPropertyDescriptors());
_desc.add(CLIENT_SERVICE);
_desc.add(INDEX);
_desc.add(TYPE);
DESCRIPTORS = Collections.unmodifiableList(_desc);
final List<PropertyDescriptor> desc = new ArrayList<>(super.getSupportedPropertyDescriptors());
desc.add(CLIENT_SERVICE);
desc.add(INDEX);
desc.add(TYPE);
descriptors = Collections.unmodifiableList(desc);
}
private volatile ConcurrentHashMap<String, RecordPath> mappings;
@Override
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
@ -104,30 +103,29 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
type = context.getProperty(TYPE).evaluateAttributeExpressions().getValue();
mapper = new ObjectMapper();
List<PropertyDescriptor> dynamic = context.getProperties().entrySet().stream()
.filter( e -> e.getKey().isDynamic())
.map(e -> e.getKey())
final List<PropertyDescriptor> dynamicDescriptors = context.getProperties().keySet().stream()
.filter(PropertyDescriptor::isDynamic)
.collect(Collectors.toList());
Map<String, RecordPath> _temp = new HashMap<>();
for (PropertyDescriptor desc : dynamic) {
String value = context.getProperty(desc).getValue();
String name = desc.getName();
_temp.put(name, RecordPath.compile(value));
final Map<String, RecordPath> tempRecordPathMappings = new HashMap<>();
for (final PropertyDescriptor desc : dynamicDescriptors) {
final String value = context.getProperty(desc).getValue();
final String name = desc.getName();
tempRecordPathMappings.put(name, RecordPath.compile(value));
}
mappings = new ConcurrentHashMap<>(_temp);
recordPathMappings = new ConcurrentHashMap<>(tempRecordPathMappings);
super.onEnabled(context);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
return descriptors;
}
@Override
public PropertyDescriptor getSupportedDynamicPropertyDescriptor(String name) {
public PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String name) {
return new PropertyDescriptor.Builder()
.name(name)
.addValidator((subject, input, context) -> {
@ -148,21 +146,21 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
}
@Override
public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
Map<String, String> context = coordinates.entrySet().stream()
public Optional<Record> lookup(final Map<String, Object> coordinates) throws LookupFailureException {
final Map<String, String> context = coordinates.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
Map.Entry::getKey,
e -> e.getValue().toString()
));
return lookup(coordinates, context);
}
@Override
public Optional<Record> lookup(Map<String, Object> coordinates, Map<String, String> context) throws LookupFailureException {
public Optional<Record> lookup(final Map<String, Object> coordinates, final Map<String, String> context) throws LookupFailureException {
validateCoordinates(coordinates);
try {
Record record;
final Record record;
if (coordinates.containsKey("_id")) {
record = getById((String)coordinates.get("_id"), context);
} else {
@ -176,8 +174,8 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
}
}
private void validateCoordinates(Map coordinates) throws LookupFailureException {
List<String> reasons = new ArrayList<>();
private void validateCoordinates(final Map<String, Object> coordinates) throws LookupFailureException {
final List<String> reasons = new ArrayList<>();
if (coordinates.containsKey("_id") && !(coordinates.get("_id") instanceof String)) {
reasons.add("_id was supplied, but it was not a String.");
@ -187,14 +185,15 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
reasons.add("When _id is used, it can be the only key used in the lookup.");
}
if (reasons.size() > 0) {
String error = String.join("\n", reasons);
if (!reasons.isEmpty()) {
final String error = String.join("\n", reasons);
throw new LookupFailureException(error);
}
}
private Record getById(final String _id, Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
Map<String, Object> query = new HashMap<String, Object>(){{
@SuppressWarnings("unchecked")
private Record getById(final String _id, final Map<String, String> context) throws IOException, LookupFailureException, SchemaNotFoundException {
final Map<String, Object> query = new HashMap<String, Object>(){{
put("query", new HashMap<String, Object>() {{
put("match", new HashMap<String, String>(){{
put("_id", _id);
@ -202,10 +201,9 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
}});
}};
String json = mapper.writeValueAsString(query);
SearchResponse response = clientService.search(json, index, type, null);
final String json = mapper.writeValueAsString(query);
final SearchResponse response = clientService.search(json, index, type, null);
if (response.getNumberOfHits() > 1) {
throw new LookupFailureException(String.format("Expected 1 response, got %d for query %s",
response.getNumberOfHits(), json));
@ -213,21 +211,20 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
return null;
}
final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
final Map<String, Object> source = (Map<String, Object>)response.getHits().get(0).get("_source");
RecordSchema toUse = getSchema(context, source, null);
final RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);
if (mappings.size() > 0) {
if (recordPathMappings.size() > 0) {
record = applyMappings(record, source);
}
return record;
}
Map<String, Object> getNested(String key, Object value) {
String path = key.substring(0, key.lastIndexOf("."));
Map<String, Object> getNested(final String key, final Object value) {
final String path = key.substring(0, key.lastIndexOf("."));
return new HashMap<String, Object>(){{
put("path", path);
@ -239,8 +236,8 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
}};
}
private Map<String, Object> buildQuery(Map<String, Object> coordinates) {
Map<String, Object> query = new HashMap<String, Object>(){{
private Map<String, Object> buildQuery(final Map<String, Object> coordinates) {
final Map<String, Object> query = new HashMap<String, Object>(){{
put("bool", new HashMap<String, Object>(){{
put("must", coordinates.entrySet().stream()
.map(e -> new HashMap<String, Object>(){{
@ -256,28 +253,25 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
}});
}};
Map<String, Object> outter = new HashMap<String, Object>(){{
return new HashMap<String, Object>(){{
put("size", 1);
put("query", query);
}};
return outter;
}
private Record getByQuery(final Map<String, Object> query, Map<String, String> context) throws LookupFailureException {
@SuppressWarnings("unchecked")
private Record getByQuery(final Map<String, Object> query, final Map<String, String> context) throws LookupFailureException {
try {
final String json = mapper.writeValueAsString(buildQuery(query));
SearchResponse response = clientService.search(json, index, type, null);
final SearchResponse response = clientService.search(json, index, type, null);
if (response.getNumberOfHits() == 0) {
return null;
} else {
final Map<String, Object> source = (Map)response.getHits().get(0).get("_source");
RecordSchema toUse = getSchema(context, source, null);
final Map<String, Object> source = (Map<String, Object>)response.getHits().get(0).get("_source");
final RecordSchema toUse = getSchema(context, source, null);
Record record = new MapRecord(toUse, source);
if (mappings.size() > 0) {
if (recordPathMappings.size() > 0) {
record = applyMappings(record, source);
}
@ -289,23 +283,20 @@ public class ElasticSearchLookupService extends JsonInferenceSchemaRegistryServi
}
}
private Record applyMappings(Record record, Map<String, Object> source) {
Record _rec = new MapRecord(record.getSchema(), new HashMap<>());
private Record applyMappings(final Record record, final Map<String, Object> source) {
final Record rec = new MapRecord(record.getSchema(), new HashMap<>());
mappings.entrySet().forEach(entry -> {
recordPathMappings.forEach((key, path) -> {
try {
Object o = JsonPath.read(source, entry.getKey());
RecordPath path = entry.getValue();
Optional<FieldValue> first = path.evaluate(_rec).getSelectedFields().findFirst();
if (first.isPresent()) {
first.get().updateValue(o);
}
final Object o = JsonPath.read(source, key);
final Optional<FieldValue> first = path.evaluate(rec).getSelectedFields().findFirst();
first.ifPresent(fieldValue -> fieldValue.updateValue(o));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
return _rec;
return rec;
}
@Override

View File

@ -87,7 +87,7 @@ public class ElasticSearchStringLookupService extends AbstractControllerService
public Optional<String> lookup(Map<String, Object> coordinates) throws LookupFailureException {
try {
final String id = (String) coordinates.get(ID);
final Map<String, Object> enums = esClient.get(index, type, id);
final Map<String, Object> enums = esClient.get(index, type, id, null);
if (enums == null) {
return Optional.empty();
} else {

View File

@ -34,6 +34,7 @@ class SearchResponseTest {
def warnings = ["auth"]
def response = new SearchResponse(results, aggs as Map<String, Object>, pitId, scrollId, searchAfter, num, took, timeout, warnings)
def str = response.toString()
Assert.assertEquals(results, response.hits)
Assert.assertEquals(aggs, response.aggregations)
Assert.assertEquals(pitId, response.pitId)

View File

@ -25,6 +25,7 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.elasticsearch.UpdateOperationResponse
import org.apache.nifi.security.util.StandardTlsConfiguration
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder
import org.apache.nifi.security.util.TlsConfiguration
@ -109,6 +110,7 @@ class ElasticSearchClientService_IT {
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000")
runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000")
runner.setProperty(service, ElasticSearchClientService.SUPPRESS_NULLS, ElasticSearchClientService.ALWAYS_SUPPRESS.getValue())
try {
runner.enableControllerService(service)
} catch (Exception ex) {
@ -125,21 +127,21 @@ class ElasticSearchClientService_IT {
@Test
void testBasicSearch() throws Exception {
String query = prettyPrint(toJson([
size: 10,
query: [
match_all: [:]
],
aggs: [
term_counts: [
terms: [
field: "msg",
size: 5
]
size: 10,
query: [
match_all: [:]
],
aggs: [
term_counts: [
terms: [
field: "msg",
size: 5
]
]
]
]
]))
SearchResponse response = service.search(query, INDEX, TYPE, null)
Assert.assertNotNull("Response was null", response)
@ -158,11 +160,11 @@ class ElasticSearchClientService_IT {
def buckets = termCounts.get("buckets")
Assert.assertNotNull("Buckets branch was empty", buckets)
def expected = [
"one": 1,
"two": 2,
"three": 3,
"four": 4,
"five": 5
"one": 1,
"two": 2,
"three": 3,
"four": 4,
"five": 5
]
buckets.each { aggRes ->
@ -172,6 +174,53 @@ class ElasticSearchClientService_IT {
}
}
@Test
void testBasicSearchRequestParameters() throws Exception {
String query = prettyPrint(toJson([
size: 10,
query: [
match_all: [:]
],
aggs: [
term_counts: [
terms: [
field: "msg",
size: 5
]
]
]
]))
SearchResponse response = service.search(query, "messages", TYPE, [preference: "_local"])
Assert.assertNotNull("Response was null", response)
Assert.assertEquals("Wrong count", 15, response.numberOfHits)
Assert.assertFalse("Timed out", response.isTimedOut())
Assert.assertNotNull("Hits was null", response.getHits())
Assert.assertEquals("Wrong number of hits", 10, response.hits.size())
Assert.assertNotNull("Aggregations are missing", response.aggregations)
Assert.assertEquals("Aggregation count is wrong", 1, response.aggregations.size())
Map termCounts = response.aggregations.get("term_counts") as Map
Assert.assertNotNull("Term counts was missing", termCounts)
def buckets = termCounts.get("buckets")
Assert.assertNotNull("Buckets branch was empty", buckets)
def expected = [
"one": 1,
"two": 2,
"three": 3,
"four": 4,
"five": 5
]
buckets.each { aggRes ->
String key = aggRes["key"]
def docCount = aggRes["doc_count"]
Assert.assertEquals("${key} did not match.", expected[key], docCount)
}
}
@Test
void testSearchWarnings() {
String query
@ -361,13 +410,55 @@ class ElasticSearchClientService_IT {
@Test
void testDeleteByQuery() throws Exception {
String query = prettyPrint(toJson([
query: [
match: [
msg: "five"
query: [
match: [
msg: "five"
]
]
]
]))
DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE)
DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, null)
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
}
@Test
void testDeleteByQueryRequestParameters() throws Exception {
String query = prettyPrint(toJson([
query: [
match: [
msg: "six"
]
]
]))
DeleteOperationResponse response = service.deleteByQuery(query, INDEX, TYPE, [refresh: "true"])
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
}
@Test
void testUpdateByQuery() throws Exception {
String query = prettyPrint(toJson([
query: [
match: [
msg: "four"
]
]
]))
UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, null)
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
}
@Test
void testUpdateByQueryRequestParameters() throws Exception {
String query = prettyPrint(toJson([
query: [
match: [
msg: "four"
]
]
]))
UpdateOperationResponse response = service.updateByQuery(query, INDEX, TYPE, [refresh: "true", slices: "1"])
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
}
@ -375,17 +466,17 @@ class ElasticSearchClientService_IT {
@Test
void testDeleteById() throws Exception {
final String ID = "1"
final def originalDoc = service.get(INDEX, TYPE, ID)
DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID)
final def originalDoc = service.get(INDEX, TYPE, ID, null)
DeleteOperationResponse response = service.deleteById(INDEX, TYPE, ID, null)
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
def doc = service.get(INDEX, TYPE, ID)
def doc = service.get(INDEX, TYPE, ID, null)
Assert.assertNull(doc)
doc = service.get(INDEX, TYPE, "2")
doc = service.get(INDEX, TYPE, "2", null)
Assert.assertNotNull(doc)
// replace the deleted doc
service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index))
service.add(new IndexOperationRequest(INDEX, TYPE, "1", originalDoc, IndexOperationRequest.Operation.Index), null)
waitForIndexRefresh() // (affects later tests using _search or _bulk)
}
@ -394,7 +485,7 @@ class ElasticSearchClientService_IT {
Map old
1.upto(15) { index ->
String id = String.valueOf(index)
def doc = service.get(INDEX, TYPE, id)
def doc = service.get(INDEX, TYPE, id, null)
Assert.assertNotNull("Doc was null", doc)
Assert.assertNotNull("${doc.toString()}\t${doc.keySet().toString()}", doc.get("msg"))
old = doc
@ -434,22 +525,22 @@ class ElasticSearchClientService_IT {
// index with nulls
suppressNulls(false)
IndexOperationResponse response = service.bulk([new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)])
IndexOperationResponse response = service.bulk([new IndexOperationRequest("nulls", TYPE, "1", doc, IndexOperationRequest.Operation.Index)], null)
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
waitForIndexRefresh()
Map<String, Object> result = service.get("nulls", TYPE, "1")
Map<String, Object> result = service.get("nulls", TYPE, "1", null)
Assert.assertEquals(doc, result)
// suppress nulls
suppressNulls(true)
response = service.bulk([new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)])
response = service.bulk([new IndexOperationRequest("nulls", TYPE, "2", doc, IndexOperationRequest.Operation.Index)], null)
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
waitForIndexRefresh()
result = service.get("nulls", TYPE, "2")
result = service.get("nulls", TYPE, "2", null)
Assert.assertTrue("Non-nulls (present): " + result.toString(), result.keySet().containsAll(["msg", "is_blank"]))
Assert.assertFalse("is_null (should be omitted): " + result.toString(), result.keySet().contains("is_null"))
Assert.assertFalse("is_empty (should be omitted): " + result.toString(), result.keySet().contains("is_empty"))
@ -479,7 +570,7 @@ class ElasticSearchClientService_IT {
put("msg", "test")
}}, IndexOperationRequest.Operation.Index))
}
IndexOperationResponse response = service.bulk(payload)
IndexOperationResponse response = service.bulk(payload, [refresh: "true"])
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
waitForIndexRefresh()
@ -488,9 +579,9 @@ class ElasticSearchClientService_IT {
* Now, check to ensure that both indexes got populated appropriately.
*/
String query = "{ \"query\": { \"match_all\": {}}}"
Long indexA = service.count(query, "bulk_a", TYPE)
Long indexB = service.count(query, "bulk_b", TYPE)
Long indexC = service.count(query, "bulk_c", TYPE)
Long indexA = service.count(query, "bulk_a", TYPE, null)
Long indexB = service.count(query, "bulk_b", TYPE, null)
Long indexC = service.count(query, "bulk_c", TYPE, null)
Assert.assertNotNull(indexA)
Assert.assertNotNull(indexB)
@ -500,7 +591,46 @@ class ElasticSearchClientService_IT {
Assert.assertEquals(10, indexB.intValue())
Assert.assertEquals(5, indexC.intValue())
Long total = service.count(query, "bulk_*", TYPE)
Long total = service.count(query, "bulk_*", TYPE, null)
Assert.assertNotNull(total)
Assert.assertEquals(25, total.intValue())
}
@Test
void testBulkRequestParameters() throws Exception {
List<IndexOperationRequest> payload = new ArrayList<>()
for (int x = 0; x < 20; x++) {
String index = x % 2 == 0 ? "bulk_a": "bulk_b"
payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap<String, Object>(){{
put("msg", "test")
}}, IndexOperationRequest.Operation.Index))
}
for (int x = 0; x < 5; x++) {
payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap<String, Object>(){{
put("msg", "test")
}}, IndexOperationRequest.Operation.Index))
}
IndexOperationResponse response = service.bulk(payload, [refresh: "true"])
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
/*
* Now, check to ensure that both indexes got populated and refreshed appropriately.
*/
String query = "{ \"query\": { \"match_all\": {}}}"
Long indexA = service.count(query, "bulk_a", TYPE, null)
Long indexB = service.count(query, "bulk_b", TYPE, null)
Long indexC = service.count(query, "bulk_c", TYPE, null)
Assert.assertNotNull(indexA)
Assert.assertNotNull(indexB)
Assert.assertNotNull(indexC)
Assert.assertEquals(indexA, indexB)
Assert.assertEquals(10, indexA.intValue())
Assert.assertEquals(10, indexB.intValue())
Assert.assertEquals(5, indexC.intValue())
Long total = service.count(query, "bulk_*", TYPE, null)
Assert.assertNotNull(total)
Assert.assertEquals(25, total.intValue())
}
@ -510,8 +640,8 @@ class ElasticSearchClientService_IT {
final String TEST_ID = "update-test"
Map<String, Object> doc = new HashMap<>()
doc.put("msg", "Buongiorno, mondo")
service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index))
Map<String, Object> result = service.get(INDEX, TYPE, TEST_ID)
service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index), [refresh: "true"])
Map<String, Object> result = service.get(INDEX, TYPE, TEST_ID, null)
Assert.assertEquals("Not the same", doc, result)
Map<String, Object> updates = new HashMap<>()
@ -520,8 +650,8 @@ class ElasticSearchClientService_IT {
merged.putAll(updates)
merged.putAll(doc)
IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update)
service.add(request)
result = service.get(INDEX, TYPE, TEST_ID)
service.add(request, [refresh: "true"])
result = service.get(INDEX, TYPE, TEST_ID, null)
Assert.assertTrue(result.containsKey("from"))
Assert.assertTrue(result.containsKey("msg"))
Assert.assertEquals("Not the same after update.", merged, result)
@ -532,17 +662,17 @@ class ElasticSearchClientService_IT {
upsertItems.put("upsert_2", 1)
upsertItems.put("upsert_3", true)
request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert)
service.add(request)
result = service.get(INDEX, TYPE, UPSERTED_ID)
service.add(request, [refresh: "true"])
result = service.get(INDEX, TYPE, UPSERTED_ID, null)
Assert.assertEquals(upsertItems, result)
List<IndexOperationRequest> deletes = new ArrayList<>()
deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete))
deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete))
Assert.assertFalse(service.bulk(deletes).hasErrors())
Assert.assertFalse(service.bulk(deletes, [refresh: "true"]).hasErrors())
waitForIndexRefresh() // wait 1s for index refresh (doesn't prevent GET but affects later tests using _search or _bulk)
Assert.assertNull(service.get(INDEX, TYPE, TEST_ID))
Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID))
Assert.assertNull(service.get(INDEX, TYPE, TEST_ID, null))
Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID, null))
}
@Test
@ -552,7 +682,7 @@ class ElasticSearchClientService_IT {
new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "two", intField: 1], IndexOperationRequest.Operation.Create), // already exists
new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "one", intField: "notaninteger"], IndexOperationRequest.Operation.Index) // can't parse int field
]
def response = service.bulk(ops)
def response = service.bulk(ops, [refresh: "true"])
assert response.hasErrors()
assert response.items.findAll {
def key = it.keySet().stream().findFirst().get()
@ -563,4 +693,4 @@ class ElasticSearchClientService_IT {
private static void waitForIndexRefresh() {
Thread.sleep(1000)
}
}
}

View File

@ -56,7 +56,7 @@ class ElasticSearchLookupServiceTest {
void simpleLookupTest() throws Exception {
def coordinates = ["_id": "12345" ]
Optional<MapRecord> result = lookupService.lookup(coordinates)
Optional<MapRecord> result = lookupService.lookup(coordinates) as Optional<MapRecord>
Assert.assertNotNull(result)
Assert.assertTrue(result.isPresent())

View File

@ -61,7 +61,6 @@ class ElasticSearchLookupService_IT {
runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400")
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000")
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000")
runner.setProperty(service, ElasticSearchClientService.RETRY_TIMEOUT, "60000")
runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service")
runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service")
runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service")

View File

@ -23,6 +23,7 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.elasticsearch.UpdateOperationResponse
class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService {
Map data = [
@ -33,37 +34,42 @@ class TestElasticSearchClientService extends AbstractControllerService implement
]
@Override
IndexOperationResponse add(IndexOperationRequest operation) {
IndexOperationResponse add(IndexOperationRequest operation, Map<String, String> requestParameters) {
return null
}
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
IndexOperationResponse bulk(List<IndexOperationRequest> operations, Map<String, String> requestParameters) {
return null
}
@Override
Long count(String query, String index, String type) {
Long count(String query, String index, String type, Map<String, String> requestParameters) {
return null
}
@Override
DeleteOperationResponse deleteById(String index, String type, String id) {
DeleteOperationResponse deleteById(String index, String type, String id, Map<String, String> requestParameters) {
return null
}
@Override
DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
DeleteOperationResponse deleteById(String index, String type, List<String> ids, Map<String, String> requestParameters) {
return null
}
@Override
DeleteOperationResponse deleteByQuery(String query, String index, String type) {
DeleteOperationResponse deleteByQuery(String query, String index, String type, Map<String, String> requestParameters) {
return null
}
@Override
Map<String, Object> get(String index, String type, String id) {
UpdateOperationResponse updateByQuery(String query, String index, String type, Map<String, String> requestParameters) {
return null
}
@Override
Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters) {
return data
}

View File

@ -0,0 +1,127 @@
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
## JVM configuration
################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
## See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information
##
################################################################
# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space
-Xms2g
-Xmx2g
################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################
## GC configuration
#-XX:+UseConcMarkSweepGC
#-XX:CMSInitiatingOccupancyFraction=75
#-XX:+UseCMSInitiatingOccupancyOnly
-XX:+UseSerialGC
## optimizations
# pre-touch memory pages used by the JVM during initialization
-XX:+AlwaysPreTouch
## basic
# force the server VM (remove on 32-bit client JVMs)
-server
# explicitly set the stack size (reduce to 320k on 32-bit client JVMs)
-Xss1m
# set to headless, just in case
-Djava.awt.headless=true
# ensure UTF-8 encoding by default (e.g. filenames)
-Dfile.encoding=UTF-8
# use our provided JNA always versus the system one
-Djna.nosys=true
# use old-style file permissions on JDK9
-Djdk.io.permissionsUseCanonicalPath=true
# flags to configure Netty
-Dio.netty.noUnsafe=true
-Dio.netty.noKeySetOptimization=true
-Dio.netty.recycler.maxCapacityPerThread=0
# log4j 2
-Dlog4j.shutdownHookEnabled=false
-Dlog4j2.disable.jmx=true
-Dlog4j.skipJansi=true
## heap dumps
# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError
# specify an alternative path for heap dumps
# ensure the directory exists and has sufficient space
#-XX:HeapDumpPath=${heap.dump.path}
## GC logging
#-XX:+PrintGCDetails
#-XX:+PrintGCTimeStamps
#-XX:+PrintGCDateStamps
#-XX:+PrintClassHistogram
#-XX:+PrintTenuringDistribution
#-XX:+PrintGCApplicationStoppedTime
# log GC status to a file with time stamps
# ensure the directory exists
#-Xloggc:${loggc}
# By default, the GC log file will not rotate.
# By uncommenting the lines below, the GC log file
# will be rotated every 128MB at most 32 times.
#-XX:+UseGCLogFileRotation
#-XX:NumberOfGCLogFiles=32
#-XX:GCLogFileSize=128M
# Elasticsearch 5.0.0 will throw an exception on unquoted field names in JSON.
# If documents were already indexed with unquoted fields in a previous version
# of Elasticsearch, some operations may throw errors.
#
# WARNING: This option will be removed in Elasticsearch 6.0.0 and is provided
# only for migration purposes.
#-Delasticsearch.json.allow_unquoted_field_names=true

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#create mapping
PUT:user_details/:{ "mappings":{"faketype":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
PUT:messages/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
@ -21,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}
PUT:bulk_b/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_c/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:error_handler:{ "mappings": { "faketype": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
#add document
PUT:messages/faketype/1:{ "msg":"one" }
PUT:messages/faketype/2:{ "msg":"two" }
@ -42,4 +44,7 @@ PUT:user_details/faketype/1:{ "email": "john.smith@company.com", "phone": "123-4
PUT:user_details/faketype/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
PUT:nested/faketype/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
PUT:nested/faketype/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
PUT:nested/faketype/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
PUT:nested/faketype/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
# refresh all indices before testing
POST:_refresh:{}

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#create mapping
PUT:user_details/:{ "mappings":{"_doc":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
PUT:messages/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
@ -43,4 +44,7 @@ PUT:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-7
PUT:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
PUT:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
PUT:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
# refresh all indices before testing
POST:_refresh:{}

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#create mapping
PUT:user_details/:{ "mappings":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}
PUT:messages/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
@ -21,6 +22,7 @@ PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
#add document
POST:messages/_doc/1:{ "msg":"one" }
POST:messages/_doc/2:{ "msg":"two" }
@ -42,4 +44,7 @@ POST:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-
POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}
# refresh all indices before testing
POST:_refresh:{}

View File

@ -129,6 +129,8 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
private final AtomicReference<OkHttpClient> okHttpClientAtomicReference = new AtomicReference<>();
final ObjectMapper mapper = new ObjectMapper();
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
@ -278,7 +280,6 @@ public abstract class AbstractElasticsearchHttpProcessor extends AbstractElastic
}
protected JsonNode parseJsonResponse(InputStream in) throws IOException {
final ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(in);
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
@ -143,8 +142,6 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
private static final ObjectMapper mapper = new ObjectMapper();
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.OperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public abstract class AbstractByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("If the \"by query\" operation fails, and a flowfile was read, it will be sent to this relationship.")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("If the \"by query\" operation succeeds, and a flowfile was read, it will be sent to this relationship.")
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
private volatile ElasticSearchClientService clientService;
static {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(QUERY);
descriptors.add(QUERY_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
abstract String getTookAttribute();
abstract String getErrorAttribute();
abstract OperationResponse performOperation(final ElasticSearchClientService clientService, final String query,
final String index, final String type,
final Map<String, String> requestParameters);
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile input = null;
if (context.hasIncomingConnection()) {
input = session.get();
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
try {
final String query = getQuery(input, context, session);
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).isSet()
? context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue()
: null;
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
final OperationResponse or = performOperation(this.clientService, query, index, type, getUrlQueryParameters(context, input));
if (input == null) {
input = session.create();
}
final Map<String, String> attrs = new HashMap<>();
attrs.put(getTookAttribute(), String.valueOf(or.getTook()));
if (!StringUtils.isBlank(queryAttr)) {
attrs.put(queryAttr, query);
}
input = session.putAllAttributes(input, attrs);
session.transfer(input, REL_SUCCESS);
} catch (final Exception e) {
if (input != null) {
input = session.putAttribute(input, getErrorAttribute(), e.getMessage());
session.transfer(input, REL_FAILURE);
}
getLogger().error("Error running \"by query\" operation: ", e);
context.yield();
}
}
}

View File

@ -30,6 +30,7 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.JsonQueryParameters;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
@ -124,6 +125,17 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
return propertyDescriptors;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService = new AtomicReference<>(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));

View File

@ -141,14 +141,17 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
if (!newQuery && PAGINATION_SCROLL.getValue().equals(paginationType)) {
response = clientService.get().scroll(queryJson);
} else {
final Map<String, String> requestParameters = getUrlQueryParameters(context, input);
if (PAGINATION_SCROLL.getValue().equals(paginationType)) {
requestParameters.put("scroll", paginatedJsonQueryParameters.getKeepAlive());
}
response = clientService.get().search(
queryJson,
// Point in Time uses general /_search API not /index/_search
PAGINATION_POINT_IN_TIME.getValue().equals(paginationType) ? null : paginatedJsonQueryParameters.getIndex(),
paginatedJsonQueryParameters.getType(),
PAGINATION_SCROLL.getValue().equals(paginationType)
? Collections.singletonMap("scroll", paginatedJsonQueryParameters.getKeepAlive())
: null
requestParameters
);
paginatedJsonQueryParameters.setPitId(response.getPitId());
paginatedJsonQueryParameters.setSearchAfter(response.getSearchAfter());

View File

@ -17,129 +17,49 @@
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.DeleteOperationResponse;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.elasticsearch.OperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."),
@WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by Elasticsearch if there is an error running the delete.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({ "elastic", "elasticsearch", "delete", "query"})
@CapabilityDescription("Delete from an Elasticsearch index using a query. The query can be loaded from a flowfile body " +
"or from the Query parameter.")
@Tags({ "elastic", "elasticsearch", "delete", "query"})
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."),
@WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by Elasticsearch if there is an error running the delete.")
})
public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("If the delete by query fails, and a flowfile was read, it will be sent to this relationship.").build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("If the delete by query succeeds, and a flowfile was read, it will be sent to this relationship.")
.build();
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
"These parameters will override any matching parameters in the query request body")
public class DeleteByQueryElasticsearch extends AbstractByQueryElasticsearch {
static final String TOOK_ATTRIBUTE = "elasticsearch.delete.took";
static final String ERROR_ATTRIBUTE = "elasticsearch.delete.error";
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
private volatile ElasticSearchClientService clientService;
static {
final Set<Relationship> _rels = new HashSet<>();
_rels.add(REL_SUCCESS);
_rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(_rels);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(QUERY);
descriptors.add(QUERY_ATTRIBUTE);
descriptors.add(INDEX);
descriptors.add(TYPE);
descriptors.add(CLIENT_SERVICE);
propertyDescriptors = Collections.unmodifiableList(descriptors);
@Override
String getTookAttribute() {
return TOOK_ATTRIBUTE;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
String getErrorAttribute() {
return ERROR_ATTRIBUTE;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = null;
if (context.hasIncomingConnection()) {
input = session.get();
if (input == null && context.hasNonLoopConnection()) {
return;
}
}
try {
final String query = getQuery(input, context, session);
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).isSet()
? context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue()
: null;
final String queryAttr = context.getProperty(QUERY_ATTRIBUTE).isSet()
? context.getProperty(QUERY_ATTRIBUTE).evaluateAttributeExpressions(input).getValue()
: null;
DeleteOperationResponse dor = clientService.deleteByQuery(query, index, type);
if (input == null) {
input = session.create();
}
Map<String, String> attrs = new HashMap<>();
attrs.put(TOOK_ATTRIBUTE, String.valueOf(dor.getTook()));
if (!StringUtils.isBlank(queryAttr)) {
attrs.put(queryAttr, query);
}
input = session.putAllAttributes(input, attrs);
session.transfer(input, REL_SUCCESS);
} catch (Exception e) {
if (input != null) {
input = session.putAttribute(input, ERROR_ATTRIBUTE, e.getMessage());
session.transfer(input, REL_FAILURE);
}
getLogger().error("Error running delete by query: ", e);
context.yield();
}
OperationResponse performOperation(final ElasticSearchClientService clientService, final String query,
final String index, final String type, final Map<String, String> requestParameters) {
return clientService.deleteByQuery(query, index, type, requestParameters);
}
}

View File

@ -30,6 +30,8 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.stream.Collectors;
public interface ElasticsearchRestProcessor {
String ATTR_RECORD_COUNT = "record.count";
@ -121,4 +123,15 @@ public interface ElasticsearchRestProcessor {
return retVal;
}
default Map<String, String> getUrlQueryParameters(final ProcessContext context, final FlowFile flowFile) {
return context.getProperties().entrySet().stream()
// filter non-null dynamic properties
.filter(e -> e.getKey().isDynamic() && e.getValue() != null)
// convert to Map of URL parameter keys and values
.collect(Collectors.toMap(
e -> e.getKey().getName(),
e -> context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue()
));
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
@ -23,6 +24,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -46,11 +48,16 @@ import java.util.concurrent.TimeUnit;
"Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
"processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " +
"from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
"These parameters will override any matching parameters in the query request body")
public class JsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<JsonQueryParameters> {
@Override
JsonQueryParameters buildJsonQueryParameters(final FlowFile input, final ProcessContext context, final ProcessSession session)
throws IOException {
final JsonQueryParameters jsonQueryParameters = new JsonQueryParameters();
populateCommonJsonQueryParameters(jsonQueryParameters, input, context, session);
return jsonQueryParameters;
@ -64,7 +71,7 @@ public class JsonQueryElasticsearch extends AbstractJsonQueryElasticsearch<JsonQ
queryJsonParameters.getQuery(),
queryJsonParameters.getIndex(),
queryJsonParameters.getType(),
null
getUrlQueryParameters(context, input)
);
if (input != null) {
session.getProvenanceReporter().send(

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
@ -26,6 +27,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.elasticsearch.SearchResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -48,6 +50,12 @@ import java.util.List;
@CapabilityDescription("A processor that allows the user to run a paginated query (with aggregations) written with the Elasticsearch JSON DSL. " +
"It will use the flowfile's content for the query unless the QUERY attribute is populated. " +
"Search After/Point in Time queries must include a valid \"sort\" field.")
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
"These parameters will override any matching parameters in the query request body")
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "Care should be taken on the size of each page because each response " +
"from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
public class PaginatedJsonQueryElasticsearch extends AbstractPaginatedJsonQueryElasticsearch implements ElasticsearchRestProcessor {

View File

@ -19,6 +19,7 @@ package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -52,6 +53,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleDateFormatValidator;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.PushBackRecordSet;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
@ -72,6 +74,12 @@ import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", "index", "record"})
@CapabilityDescription("A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries.")
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
"These parameters will override any matching parameters in the query request body")
public class PutElasticsearchRecord extends AbstractProcessor implements ElasticsearchRestProcessor {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("put-es-record-reader")
@ -244,6 +252,25 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
)));
static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
IndexOperationRequest.Operation.Index.getValue().toLowerCase(),
IndexOperationRequest.Operation.Update.getValue().toLowerCase(),
IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
));
private RecordPathCache recordPathCache;
private RecordReaderFactory readerFactory;
private RecordSetWriterFactory writerFactory;
private boolean logErrors;
private ObjectMapper errorMapper;
private volatile ElasticSearchClientService clientService;
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
@ -254,17 +281,19 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
return DESCRIPTORS;
}
private RecordReaderFactory readerFactory;
private RecordPathCache recordPathCache;
private ElasticSearchClientService clientService;
private RecordSetWriterFactory writerFactory;
private boolean logErrors;
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dynamic(true)
.build();
}
@OnScheduled
public void onScheduled(ProcessContext context) {
public void onScheduled(final ProcessContext context) {
this.readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
this.recordPathCache = new RecordPathCache(16);
@ -283,18 +312,15 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
if (this.timestampFormat == null) {
this.timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
}
if (errorMapper == null && (logErrors || getLogger().isDebugEnabled())) {
errorMapper = new ObjectMapper();
errorMapper.enable(SerializationFeature.INDENT_OUTPUT);
}
}
static final List<String> ALLOWED_INDEX_OPERATIONS = Collections.unmodifiableList(Arrays.asList(
IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
IndexOperationRequest.Operation.Index.getValue().toLowerCase(),
IndexOperationRequest.Operation.Update.getValue().toLowerCase(),
IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
));
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>();
final PropertyValue indexOp = validationContext.getProperty(INDEX_OP);
@ -319,8 +345,8 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) {
FlowFile input = session.get();
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final FlowFile input = session.get();
if (input == null) {
return;
}
@ -336,25 +362,26 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
final String typePath = context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
final String atTimestampPath = context.getProperty(AT_TIMESTAMP_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
RecordPath ioPath = indexOpPath != null ? recordPathCache.getCompiled(indexOpPath) : null;
RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
RecordPath atPath = atTimestampPath != null ? recordPathCache.getCompiled(atTimestampPath) : null;
final RecordPath ioPath = indexOpPath != null ? recordPathCache.getCompiled(indexOpPath) : null;
final RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
final RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
final RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
final RecordPath atPath = atTimestampPath != null ? recordPathCache.getCompiled(atTimestampPath) : null;
boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean();
boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean();
final boolean retainId = context.getProperty(RETAIN_ID_FIELD).evaluateAttributeExpressions(input).asBoolean();
final boolean retainTimestamp = context.getProperty(RETAIN_AT_TIMESTAMP_FIELD).evaluateAttributeExpressions(input).asBoolean();
int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
List<FlowFile> badRecords = new ArrayList<>();
final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
final List<FlowFile> badRecords = new ArrayList<>();
try (final InputStream inStream = session.read(input);
final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
Record record;
List<IndexOperationRequest> operationList = new ArrayList<>();
List<Record> originals = new ArrayList<>();
final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
final PushBackRecordSet recordSet = new PushBackRecordSet(reader.createRecordSet());
final List<IndexOperationRequest> operationList = new ArrayList<>();
final List<Record> originals = new ArrayList<>();
while ((record = reader.nextRecord()) != null) {
Record record;
while ((record = recordSet.next()) != null) {
final String idx = getFromRecordPath(record, iPath, index, false);
final String t = getFromRecordPath(record, tPath, type, false);
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.forValue(getFromRecordPath(record, ioPath, indexOp, false));
@ -369,9 +396,9 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
originals.add(record);
if (operationList.size() == batchSize) {
BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
FlowFile bad = indexDocuments(bundle, session, input);
if (operationList.size() == batchSize || !recordSet.isAnotherRecord()) {
final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
final FlowFile bad = indexDocuments(bundle, context, session, input);
if (bad != null) {
badRecords.add(bad);
}
@ -382,22 +409,22 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
}
if (!operationList.isEmpty()) {
BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
FlowFile bad = indexDocuments(bundle, session, input);
final BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
final FlowFile bad = indexDocuments(bundle, context, session, input);
if (bad != null) {
badRecords.add(bad);
}
}
} catch (ElasticsearchError ese) {
String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
} catch (final ElasticsearchError ese) {
final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Moving to retry." : "Moving to failure");
getLogger().error(msg, ese);
Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
final Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
session.penalize(input);
session.transfer(input, rel);
removeBadRecordFlowFiles(badRecords, session);
return;
} catch (Exception ex) {
} catch (final Exception ex) {
getLogger().error("Could not index documents.", ex);
session.transfer(input, REL_FAILURE);
removeBadRecordFlowFiles(badRecords, session);
@ -406,22 +433,20 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
session.transfer(input, REL_SUCCESS);
}
private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession session) {
for (FlowFile badFlowFile : bad) {
private void removeBadRecordFlowFiles(final List<FlowFile> bad, final ProcessSession session) {
for (final FlowFile badFlowFile : bad) {
session.remove(badFlowFile);
}
bad.clear();
}
private FlowFile indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input) throws Exception {
IndexOperationResponse response = clientService.bulk(bundle.getOperationList());
private FlowFile indexDocuments(final BulkOperation bundle, final ProcessContext context, final ProcessSession session, final FlowFile input) throws Exception {
final IndexOperationResponse response = clientService.bulk(bundle.getOperationList(), getUrlQueryParameters(context, input));
if (response.hasErrors()) {
if(logErrors || getLogger().isDebugEnabled()) {
List<Map<String, Object>> errors = response.getItems();
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", mapper.writeValueAsString(errors));
if (logErrors || getLogger().isDebugEnabled()) {
final List<Map<String, Object>> errors = response.getItems();
final String output = String.format("An error was encountered while processing bulk operations. Server response below:%n%n%s", errorMapper.writeValueAsString(errors));
if (logErrors) {
getLogger().error(output);
@ -434,16 +459,16 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
FlowFile errorFF = session.create(input);
try {
int added = 0;
try (OutputStream os = session.write(errorFF);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
try (final OutputStream os = session.write(errorFF);
final RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os, errorFF )) {
writer.beginRecordSet();
for (int index = 0; index < response.getItems().size(); index++) {
Map<String, Object> current = response.getItems().get(index);
final Map<String, Object> current = response.getItems().get(index);
if (!current.isEmpty()) {
String key = current.keySet().stream().findFirst().orElse(null);
final String key = current.keySet().stream().findFirst().orElse(null);
@SuppressWarnings("unchecked")
Map<String, Object> inner = (Map<String, Object>) current.get(key);
final Map<String, Object> inner = (Map<String, Object>) current.get(key);
if (inner != null && inner.containsKey("error")) {
writer.write(bundle.getOriginalRecords().get(index));
added++;
@ -458,7 +483,7 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
session.transfer(errorFF, REL_FAILED_RECORDS);
return errorFF;
} catch (Exception ex) {
} catch (final Exception ex) {
getLogger().error("", ex);
session.remove(errorFF);
throw ex;
@ -474,10 +499,10 @@ public class PutElasticsearchRecord extends AbstractProcessor implements Elastic
return fallback;
}
RecordPathResult result = path.evaluate(record);
Optional<FieldValue> value = result.getSelectedFields().findFirst();
final RecordPathResult result = path.evaluate(record);
final Optional<FieldValue> value = result.getSelectedFields().findFirst();
if (value.isPresent() && value.get().getValue() != null) {
FieldValue fieldValue = value.get();
final FieldValue fieldValue = value.get();
if (!fieldValue.getField().getDataType().getFieldType().equals(RecordFieldType.STRING) ) {
throw new ProcessException(
String.format("Field referenced by %s must be a string.", path.getPath())

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
@ -65,6 +66,12 @@ import java.util.Set;
"Search After/Point in Time queries must include a valid \"sort\" field. The processor will retrieve multiple pages of results " +
"until either no more results are available or the Pagination Keep Alive expiration is reached, after which the query will " +
"restart with the first page of results being retrieved.")
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
"These parameters will override any matching parameters in the query request body")
@Stateful(scopes = Scope.LOCAL, description = "The pagination state (scrollId, searchAfter, pitId, hitCount, pageCount, pageExpirationTimestamp) " +
"is retained in between invocations of this processor until the Scroll/PiT has expired " +
"(when the current time is later than the last query execution plus the Pagination Keep Alive interval).")

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.OperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import java.util.Map;
@WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.update.took", description = "The amount of time that it took to complete the update operation in ms."),
@WritesAttribute(attribute = "elasticsearch.update.error", description = "The error message provided by Elasticsearch if there is an error running the update.")
})
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@Tags({ "elastic", "elasticsearch", "update", "query"})
@CapabilityDescription("Update documents in an Elasticsearch index using a query. The query can be loaded from a flowfile body " +
"or from the Query parameter.")
@DynamicProperty(
name = "The name of a URL query parameter to add",
value = "The value of the URL query parameter",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. " +
"These parameters will override any matching parameters in the query request body")
public class UpdateByQueryElasticsearch extends AbstractByQueryElasticsearch {
static final String TOOK_ATTRIBUTE = "elasticsearch.update.took";
static final String ERROR_ATTRIBUTE = "elasticsearch.update.error";
@Override
String getTookAttribute() {
return TOOK_ATTRIBUTE;
}
@Override
String getErrorAttribute() {
return ERROR_ATTRIBUTE;
}
@Override
OperationResponse performOperation(final ElasticSearchClientService clientService, final String query,
final String index, final String type, final Map<String, String> requestParameters) {
return clientService.updateByQuery(query, index, type, requestParameters);
}
}

View File

@ -17,4 +17,5 @@ org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch
org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch
org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch
org.apache.nifi.processors.elasticsearch.SearchElasticsearch
org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord
org.apache.nifi.processors.elasticsearch.UpdateByQueryElasticsearch

View File

@ -37,5 +37,48 @@
not too large for it to handle. When failures do occur, this processor is capable of attempting to write the records
that failed to an output record writer so that only failed records can be processed downstream or replayed.
</p>
<p>
The index, operation and (optional) type fields are configured with default values that can be overridden using
record path operations that find an index or type value in the record set.
The ID and operation type (create, index, update, upsert or delete) can also be extracted in a similar fashion from
the record set. The following is an example of a document exercising all of these features:
</p>
<pre>
{
"metadata": {
"id": "12345",
"index": "test",
"type": "message",
"operation": "index"
},
"message": "Hello, world",
"from": "john.smith"
}
</pre>
<pre>
{
"metadata": {
"id": "12345",
"index": "test",
"type": "message",
"operation": "delete"
}
}
</pre>
<p>The record path operations below would extract the relevant data:</p>
<ul>
<li>/metadata/id</li>
<li>/metadata/index</li>
<li>metadata/type</li>
<li>metadata/operation</li>
</ul>
<p>Valid values for "operation" are:</p>
<ul>
<li>create</li>
<li>delete</li>
<li>index</li>
<li>update</li>
<li>upsert</li>
</ul>
</body>
</html>

View File

@ -0,0 +1,48 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>UpdateByQueryElasticsearch</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>This processor executes an update operation against one or more indices using the _update_by_query handler. The
query should be a valid Elasticsearch JSON DSL query (Lucene syntax is not supported). An optional Elasticsearch
script can be specified to execute against the matched documents. An example query with script:</p>
<pre>
{
"script": {
"source": "ctx._source.count++",
"lang": "painless"
},
"query": {
"match": {
"username.keyword": "john.smith"
}
}
}
</pre>
<p>To update all of the contents of an index, this could be used:</p>
<pre>
{
"query": {
"match_all": {}
}
}
</pre>
</body>
</html>

View File

@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
import org.junit.Assert
import org.junit.Test
abstract class AbstractByQueryElasticsearchTest {
private static final String INDEX = "test_idx"
private static final String TYPE = "test_type"
private static final String CLIENT_NAME = "clientService"
private TestElasticsearchClientService client
abstract String queryAttr()
abstract String tookAttr()
abstract String errorAttr()
abstract TestRunner setupRunner()
abstract void expectError(final TestElasticsearchClientService client)
private void initClient(TestRunner runner) throws Exception {
client = new TestElasticsearchClientService(true)
runner.addControllerService(CLIENT_NAME, client)
runner.enableControllerService(client)
runner.setProperty(AbstractByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME)
}
private void postTest(TestRunner runner, String queryParam) {
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 0)
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 1)
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_SUCCESS)
final String attr = flowFiles.get(0).getAttribute(tookAttr())
final String query = flowFiles.get(0).getAttribute(queryAttr())
Assert.assertNotNull(attr)
Assert.assertEquals(attr, "100")
Assert.assertNotNull(query)
Assert.assertEquals(queryParam, query)
}
@Test
void testWithFlowfileInput() throws Exception {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
runner.assertValid()
runner.enqueue(query)
runner.run()
postTest(runner, query)
Assert.assertTrue(client.getRequestParameters().isEmpty())
}
@Test
void testWithFlowfileInputAndRequestParameters() throws Exception {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
runner.setProperty("refresh", "true")
runner.setProperty("slices", '${slices}')
initClient(runner)
runner.assertValid()
runner.enqueue(query, [slices: "auto"])
runner.run()
postTest(runner, query)
Assert.assertEquals(2, client.getRequestParameters().size())
Assert.assertEquals("true", client.getRequestParameters().get("refresh"))
Assert.assertEquals("auto", client.getRequestParameters().get("slices"))
}
@Test
void testWithQuery() throws Exception {
final String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"\${field.name}.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}"
final Map<String, String> attrs = new HashMap<String, String>(){{
put("field.name", "test_field")
}}
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
runner.assertValid()
runner.enqueue("", attrs)
runner.run()
postTest(runner, query.replace('${field.name}', "test_field"))
runner.clearTransferState()
final String query2 = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"test_field.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}"
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query2)
runner.setIncomingConnection(false)
runner.assertValid()
runner.run()
postTest(runner, query2)
}
@Test
void testErrorAttribute() throws Exception {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
expectError(client)
runner.assertValid()
runner.enqueue("")
runner.run()
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 0)
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 1)
final MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(AbstractByQueryElasticsearch.REL_FAILURE).get(0)
final String attr = mockFlowFile.getAttribute(errorAttr())
Assert.assertNotNull(attr)
}
@Test
void testInputHandling() {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
runner.assertValid()
runner.run()
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_SUCCESS, 0)
runner.assertTransferCount(AbstractByQueryElasticsearch.REL_FAILURE, 0)
}
@Test
void testNoInputHandling() {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
initClient(runner)
runner.setIncomingConnection(false)
runner.assertValid()
runner.run()
postTest(runner, query)
Assert.assertTrue(client.getRequestParameters().isEmpty())
}
@Test
void testNoInputHandlingWithRequestParameters() {
final String query = "{ \"query\": { \"match_all\": {} }}"
final TestRunner runner = setupRunner()
runner.setProperty(AbstractByQueryElasticsearch.QUERY, query)
runner.setProperty(AbstractByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(AbstractByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(AbstractByQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr())
runner.setProperty("refresh", "true")
runner.setProperty("slices", '${slices}')
runner.setVariable("slices", "auto")
initClient(runner)
runner.setIncomingConnection(false)
runner.assertValid()
runner.run()
postTest(runner, query)
Assert.assertEquals(2, client.getRequestParameters().size())
Assert.assertEquals("true", client.getRequestParameters().get("refresh"))
Assert.assertEquals("auto", client.getRequestParameters().get("slices"))
}
}

View File

@ -29,8 +29,8 @@ import org.junit.Test
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
import static org.hamcrest.CoreMatchers.equalTo
import static org.hamcrest.MatcherAssert.assertThat
import static org.hamcrest.CoreMatchers.is
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.Assert.assertThrows
abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryElasticsearch> {
@ -269,6 +269,27 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
testCounts(runner, 0, 1, 0, 0)
}
@Test
void testRequestParameters() {
final TestRunner runner = createRunner(false)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
runner.setProperty("refresh", "true")
runner.setProperty("slices", '${slices}')
runner.setVariable("slices", "auto")
runOnce(runner)
final TestElasticsearchClientService service = runner.getControllerService("esService") as TestElasticsearchClientService
if (getProcessor() instanceof SearchElasticsearch || getProcessor() instanceof PaginatedJsonQueryElasticsearch) {
Assert.assertEquals(3, service.getRequestParameters().size())
Assert.assertEquals("600s", service.getRequestParameters().get("scroll"))
} else {
Assert.assertEquals(2, service.getRequestParameters().size())
}
Assert.assertEquals("true", service.getRequestParameters().get("refresh"))
Assert.assertEquals("auto", service.getRequestParameters().get("slices"))
}
static void testCounts(TestRunner runner, int original, int hits, int failure, int aggregations) {
runner.assertTransferCount(AbstractJsonQueryElasticsearch.REL_ORIGINAL, original)
runner.assertTransferCount(AbstractJsonQueryElasticsearch.REL_HITS, hits)

View File

@ -17,127 +17,32 @@
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.Assert
import org.junit.Test
class DeleteByQueryElasticsearchTest {
private static final String INDEX = "test_idx"
private static final String TYPE = "test_type"
private static final String QUERY_ATTR = "es.delete.query"
private static final String CLIENT_NAME = "clientService"
private TestElasticsearchClientService client
private void initClient(TestRunner runner) throws Exception {
client = new TestElasticsearchClientService(true)
runner.addControllerService(CLIENT_NAME, client)
runner.enableControllerService(client)
runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME)
class DeleteByQueryElasticsearchTest extends AbstractByQueryElasticsearchTest {
@Override
String queryAttr() {
return "es.delete.query"
}
private static void postTest(TestRunner runner, String queryParam) {
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0)
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1)
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS)
String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE)
String query = flowFiles.get(0).getAttribute(QUERY_ATTR)
Assert.assertNotNull(attr)
Assert.assertEquals(attr, "100")
Assert.assertNotNull(query)
Assert.assertEquals(queryParam, query)
@Override
String tookAttr() {
return DeleteByQueryElasticsearch.TOOK_ATTRIBUTE
}
@Test
void testWithFlowfileInput() throws Exception {
String query = "{ \"query\": { \"match_all\": {} }}"
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
initClient(runner)
runner.assertValid()
runner.enqueue(query)
runner.run()
postTest(runner, query)
@Override
String errorAttr() {
return DeleteByQueryElasticsearch.ERROR_ATTRIBUTE
}
@Test
void testWithQuery() throws Exception {
String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"\${field.name}.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}"
Map<String, String> attrs = new HashMap<String, String>(){{
put("field.name", "test_field")
}}
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
initClient(runner)
runner.assertValid()
runner.enqueue("", attrs)
runner.run()
postTest(runner, query.replace('${field.name}', "test_field"))
runner.clearTransferState()
query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"test_field.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}"
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
runner.setIncomingConnection(false)
runner.assertValid()
runner.run()
postTest(runner, query)
@Override
TestRunner setupRunner() {
return TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
}
@Test
void testErrorAttribute() throws Exception {
String query = "{ \"query\": { \"match_all\": {} }}"
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
initClient(runner)
@Override
void expectError(final TestElasticsearchClientService client) {
client.setThrowErrorInDelete(true)
runner.assertValid()
runner.enqueue("")
runner.run()
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0)
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1)
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0)
String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE)
Assert.assertNotNull(attr)
}
@Test
void testInputHandling() {
String query = "{ \"query\": { \"match_all\": {} }}"
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
initClient(runner)
runner.assertValid()
runner.run()
}
}

View File

@ -125,7 +125,11 @@ class PutElasticsearchRecordTest {
void basicTest(int failure, int retry, int success, Closure evalClosure) {
clientService.evalClosure = evalClosure
runner.enqueue(flowFileContents, [ "schema.name": "simple" ])
basicTest(failure, retry, success, [ "schema.name": "simple" ])
}
void basicTest(int failure, int retry, int success, Map<String, String> attr) {
runner.enqueue(flowFileContents, attr)
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure)
@ -135,6 +139,11 @@ class PutElasticsearchRecordTest {
@Test
void simpleTest() {
def evalParametersClosure = { Map<String, String> params ->
Assert.assertTrue(params.isEmpty())
}
clientService.evalParametersClosure = evalParametersClosure
basicTest(0, 0, 1)
}
@ -149,6 +158,41 @@ class PutElasticsearchRecordTest {
basicTest(0, 0, 1, evalClosure)
}
@Test
void simpleTestWithRequestParameters() {
runner.setProperty("refresh", "true")
runner.setProperty("slices", '${slices}')
runner.setVariable("slices", "auto")
runner.assertValid()
def evalParametersClosure = { Map<String, String> params ->
Assert.assertEquals(2, params.size())
Assert.assertEquals("true", params.get("refresh"))
Assert.assertEquals("auto", params.get("slices"))
}
clientService.evalParametersClosure = evalParametersClosure
basicTest(0, 0, 1)
}
@Test
void simpleTestWithRequestParametersFlowFileEL() {
runner.setProperty("refresh", "true")
runner.setProperty("slices", '${slices}')
runner.assertValid()
def evalParametersClosure = { Map<String, String> params ->
Assert.assertEquals(2, params.size())
Assert.assertEquals("true", params.get("refresh"))
Assert.assertEquals("auto", params.get("slices"))
}
clientService.evalParametersClosure = evalParametersClosure
basicTest(0, 0, 1, ["schema.name": "simple", slices: "auto"])
}
@Test
void simpleTestWithMockReader() {
reader = new MockRecordParser()

View File

@ -27,8 +27,8 @@ import java.time.Instant
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
import static org.hamcrest.MatcherAssert.assertThat
import static org.hamcrest.CoreMatchers.is
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.Assert.fail
class SearchElasticsearchTest extends AbstractPaginatedJsonQueryElasticsearchTest {

View File

@ -24,63 +24,78 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.elasticsearch.UpdateOperationResponse
class TestElasticsearchClientService extends AbstractControllerService implements ElasticSearchClientService {
private boolean returnAggs
private boolean throwErrorInSearch
private boolean throwErrorInDelete
private boolean throwErrorInPit
private boolean throwErrorInUpdate
private int pageCount = 0
private int maxPages = 1
private String query
private Map<String, String> requestParameters
TestElasticsearchClientService(boolean returnAggs) {
this.returnAggs = returnAggs
}
@Override
IndexOperationResponse add(IndexOperationRequest operation) {
return bulk(Arrays.asList(operation) as List<IndexOperationRequest>)
private void common(boolean throwError, Map<String, String> requestParameters) {
if (throwError) {
throw new IOException("Simulated IOException")
}
this.requestParameters = requestParameters
}
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
IndexOperationResponse add(IndexOperationRequest operation, Map<String, String> requestParameters) {
return bulk(Arrays.asList(operation) as List<IndexOperationRequest>, requestParameters)
}
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> operations, Map<String, String> requestParameters) {
common(false, requestParameters)
return new IndexOperationResponse(100L)
}
@Override
Long count(String query, String index, String type) {
Long count(String query, String index, String type, Map<String, String> requestParameters) {
common(false, requestParameters)
return null
}
@Override
DeleteOperationResponse deleteById(String index, String type, String id) {
return deleteById(index, type, Arrays.asList(id))
DeleteOperationResponse deleteById(String index, String type, String id, Map<String, String> requestParameters) {
return deleteById(index, type, Arrays.asList(id), requestParameters)
}
@Override
DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
if (throwErrorInDelete) {
throw new IOException("Simulated IOException")
}
DeleteOperationResponse deleteById(String index, String type, List<String> ids, Map<String, String> requestParameters) {
common(throwErrorInDelete, requestParameters)
return new DeleteOperationResponse(100L)
}
@Override
DeleteOperationResponse deleteByQuery(String query, String index, String type) {
return deleteById(index, type, Arrays.asList("1"))
DeleteOperationResponse deleteByQuery(String query, String index, String type, Map<String, String> requestParameters) {
return deleteById(index, type, Arrays.asList("1"), requestParameters)
}
@Override
Map<String, Object> get(String index, String type, String id) {
UpdateOperationResponse updateByQuery(String query, String index, String type, Map<String, String> requestParameters) {
common(throwErrorInUpdate, requestParameters)
return new UpdateOperationResponse(100L)
}
@Override
Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters) {
common(false, requestParameters)
return [ "msg": "one" ]
}
@Override
SearchResponse search(String query, String index, String type, Map<String, String> requestParameters) {
if (throwErrorInSearch) {
throw new IOException("Simulated IOException")
}
common(throwErrorInSearch, requestParameters)
this.query = query
final SearchResponse response
@ -101,7 +116,7 @@ class TestElasticsearchClientService extends AbstractControllerService implement
throw new IOException("Simulated IOException - scroll")
}
return search(null, null, null, null)
return search(null, null, null, requestParameters)
}
@Override
@ -295,6 +310,10 @@ class TestElasticsearchClientService extends AbstractControllerService implement
this.throwErrorInPit = throwErrorInPit
}
void setThrowErrorInUpdate(boolean throwErrorInUpdate) {
this.throwErrorInUpdate = throwErrorInUpdate
}
void resetPageCount() {
this.pageCount = 0
}
@ -302,4 +321,8 @@ class TestElasticsearchClientService extends AbstractControllerService implement
void setMaxPages(int maxPages) {
this.maxPages = maxPages
}
Map<String, String> getRequestParameters() {
return this.requestParameters
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
class UpdateByQueryElasticsearchTest extends AbstractByQueryElasticsearchTest {
@Override
String queryAttr() {
return "es.update.query"
}
@Override
String tookAttr() {
return UpdateByQueryElasticsearch.TOOK_ATTRIBUTE
}
@Override
String errorAttr() {
return UpdateByQueryElasticsearch.ERROR_ATTRIBUTE
}
@Override
TestRunner setupRunner() {
return TestRunners.newTestRunner(UpdateByQueryElasticsearch.class)
}
@Override
void expectError(final TestElasticsearchClientService client) {
client.setThrowErrorInUpdate(true)
}
}

View File

@ -23,43 +23,49 @@ import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.elasticsearch.UpdateOperationResponse
class AbstractMockElasticsearchClient extends AbstractControllerService implements ElasticSearchClientService {
boolean throwRetriableError
boolean throwFatalError
@Override
IndexOperationResponse add(IndexOperationRequest operation) {
IndexOperationResponse add(IndexOperationRequest operation, Map<String, String> requestParameters) {
return null
}
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
IndexOperationResponse bulk(List<IndexOperationRequest> operations, Map<String, String> requestParameters) {
return null
}
@Override
Long count(String query, String index, String type) {
Long count(String query, String index, String type, Map<String, String> requestParameters) {
return null
}
@Override
DeleteOperationResponse deleteById(String index, String type, String id) {
DeleteOperationResponse deleteById(String index, String type, String id, Map<String, String> requestParameters) {
return null
}
@Override
DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
DeleteOperationResponse deleteById(String index, String type, List<String> ids, Map<String, String> requestParameters) {
return null
}
@Override
DeleteOperationResponse deleteByQuery(String query, String index, String type) {
DeleteOperationResponse deleteByQuery(String query, String index, String type, Map<String, String> requestParameters) {
return null
}
@Override
Map<String, Object> get(String index, String type, String id) {
UpdateOperationResponse updateByQuery(String query, String index, String type, Map<String, String> requestParameters) {
return null
}
@Override
Map<String, Object> get(String index, String type, String id, Map<String, String> requestParameters) {
return null
}

View File

@ -24,9 +24,10 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse
class MockBulkLoadClientService extends AbstractMockElasticsearchClient {
IndexOperationResponse response
Closure evalClosure
Closure evalParametersClosure
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> items) {
IndexOperationResponse bulk(List<IndexOperationRequest> items, Map<String, String> requestParameters) {
if (throwRetriableError) {
throw new MockElasticsearchError(true)
} else if (throwFatalError) {
@ -37,6 +38,10 @@ class MockBulkLoadClientService extends AbstractMockElasticsearchClient {
evalClosure.call(items)
}
if (evalParametersClosure) {
evalParametersClosure.call(requestParameters)
}
response
}