NIFI-5248 Added new Elasticsearch processors.

NIFI-5248 Fixed a few stray 1.7.0 references.
NIFI-5248 Removed build helper plugin.
NIFI-5248 Made changes requested in a review.
NIFI-5248 Updated dummy service.
NIFI-5248 Made a few changes from a code review.
NIFI-5248 Added logic for removing nulls so record paths can be removed when no longer needed.
NIFI-5248 Switched from variable registry to flowfile level EL.
NIFI-5248 Added JsonPath code to remove index, id and type path statements.
NIFI-5248 Updated validation.
NIFI-5248 Set the field to null instead of empty string when nulling records.
NIFI-5248 Fixed TestElasticSearchClientService.
NIFI-5248 Removed high level client and switched over to low level client for everything.
NIFI-5248 Added profiles for ES 6 and ES 7 integration testing.
NIFI-5248 Updated integration tests to support 5 and 6.
NIFI-5248 Fixed some style check breaks.
NIFI-5248 Added create operation type.
NIFI-5248 Updated documentation.
NIFI-5248 Added error handling to PutElasticsearchRecord.
NIFI-5248 Added error logging to PutElasticsearchJson.
NIFI-5248 Added split failed records option to PutElasticsearchJson.
NIFI-5248 Added documentation for PutElasticsearchRecord.
NIFI-5248 Updated import to not use * import.
NIFI-5248 Removed processor that is no longer relevant due to schema inference.
NIFI-5248 Renamed ElasticSearch instances to Elasticsearch where we can within API guidelines.

NIFI-5248 Added groovy-json test dependency.

NIFI-5248 Updated PutElasticsearchRecord to only do index operations.

NIFI-5248 Added batch size property and refactored the way relationships and properties are added.

NIFI-5248 Added batch processing support.

NIFI-5248 Updated error handling.

NIFI-5248 Updated to 1.11.0-SNAPSHOT.

NIFI-5248 Made changes requested in a code review.

NIFI-5248 Made a few more changes from a code review.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2861
This commit is contained in:
Mike Thomsen 2018-07-08 14:51:12 -04:00 committed by Matthew Burgess
parent f1be730c94
commit 4c79ff0576
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
38 changed files with 2178 additions and 450 deletions

View File

@ -43,5 +43,11 @@
<version>1.11.0-SNAPSHOT</version> <version>1.11.0-SNAPSHOT</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.8</version>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -30,12 +30,12 @@ import java.util.List;
import java.util.Map; import java.util.Map;
@Tags({"elasticsearch", "client"}) @Tags({"elasticsearch", "client"})
@CapabilityDescription("A controller service for accessing an ElasticSearch client.") @CapabilityDescription("A controller service for accessing an Elasticsearch client.")
public interface ElasticSearchClientService extends ControllerService { public interface ElasticSearchClientService extends ControllerService {
PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder() PropertyDescriptor HTTP_HOSTS = new PropertyDescriptor.Builder()
.name("el-cs-http-hosts") .name("el-cs-http-hosts")
.displayName("HTTP Hosts") .displayName("HTTP Hosts")
.description("A comma-separated list of HTTP hosts that host ElasticSearch query nodes.") .description("A comma-separated list of HTTP hosts that host Elasticsearch query nodes.")
.required(true) .required(true)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -93,7 +93,7 @@ public interface ElasticSearchClientService extends ControllerService {
PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("el-cs-charset") .name("el-cs-charset")
.displayName("Charset") .displayName("Charset")
.description("The charset to use for interpreting the response from ElasticSearch.") .description("The charset to use for interpreting the response from Elasticsearch.")
.required(true) .required(true)
.defaultValue("UTF-8") .defaultValue("UTF-8")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
@ -106,16 +106,26 @@ public interface ElasticSearchClientService extends ControllerService {
* @return IndexOperationResponse if successful * @return IndexOperationResponse if successful
* @throws IOException thrown when there is an error. * @throws IOException thrown when there is an error.
*/ */
IndexOperationResponse add(IndexOperationRequest operation) throws IOException; IndexOperationResponse add(IndexOperationRequest operation);
/** /**
* Index multiple documents. * Bulk process multiple documents.
* *
* @param operations A list of documents to index. * @param operations A list of index operations.
* @return IndexOperationResponse if successful. * @return IndexOperationResponse if successful.
* @throws IOException thrown when there is an error. * @throws IOException thrown when there is an error.
*/ */
IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException; IndexOperationResponse bulk(List<IndexOperationRequest> operations);
/**
* Count the documents that match the criteria.
*
* @param query A query in the JSON DSL syntax
* @param index The index to target.
* @param type The type to target.
* @return
*/
Long count(String query, String index, String type);
/** /**
* Delete a document by its ID from an index. * Delete a document by its ID from an index.
@ -125,7 +135,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @param id The document ID to remove from the selected index. * @param id The document ID to remove from the selected index.
* @return A DeleteOperationResponse object if successful. * @return A DeleteOperationResponse object if successful.
*/ */
DeleteOperationResponse deleteById(String index, String type, String id) throws IOException; DeleteOperationResponse deleteById(String index, String type, String id);
/** /**
@ -136,7 +146,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @return A DeleteOperationResponse object if successful. * @return A DeleteOperationResponse object if successful.
* @throws IOException thrown when there is an error. * @throws IOException thrown when there is an error.
*/ */
DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException; DeleteOperationResponse deleteById(String index, String type, List<String> ids);
/** /**
* Delete documents by query. * Delete documents by query.
@ -146,7 +156,7 @@ public interface ElasticSearchClientService extends ControllerService {
* @param type The type to target within the index. Optional. * @param type The type to target within the index. Optional.
* @return A DeleteOperationResponse object if successful. * @return A DeleteOperationResponse object if successful.
*/ */
DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException; DeleteOperationResponse deleteByQuery(String query, String index, String type);
/** /**
* Get a document by ID. * Get a document by ID.
@ -157,23 +167,23 @@ public interface ElasticSearchClientService extends ControllerService {
* @return Map if successful, null if not found. * @return Map if successful, null if not found.
* @throws IOException thrown when there is an error. * @throws IOException thrown when there is an error.
*/ */
Map<String, Object> get(String index, String type, String id) throws IOException; Map<String, Object> get(String index, String type, String id);
/** /**
* Perform a search using the JSON DSL. * Perform a search using the JSON DSL.
* *
* @param query A JSON string reprensenting the query. * @param query A JSON string reprensenting the query.
* @param index The index to target. Optional. * @param index The index to target. Optional.
* @param type The type to target. Optional. Will not be used in future versions of ElasticSearch. * @param type The type to target. Optional. Will not be used in future versions of Elasticsearch.
* @return A SearchResponse object if successful. * @return A SearchResponse object if successful.
*/ */
SearchResponse search(String query, String index, String type) throws IOException; SearchResponse search(String query, String index, String type);
/** /**
* Build a transit URL to use with the provenance reporter. * Build a transit URL to use with the provenance reporter.
* @param index Index targeted. Optional. * @param index Index targeted. Optional.
* @param type Type targeted. Optional * @param type Type targeted. Optional
* @return a URL describing the ElasticSearch cluster. * @return a URL describing the Elasticsearch cluster.
*/ */
String getTransitUrl(String index, String type); String getTransitUrl(String index, String type);
} }

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.elasticsearch;
import java.util.HashSet;
import java.util.Set;
public class ElasticsearchError extends RuntimeException {
/**
* These are names of common Elasticsearch exceptions where it is safe to assume
* that it's OK to retry the operation instead of just sending it to an error relationship.
*/
public static final Set<String> ELASTIC_ERROR_NAMES = new HashSet<String>(){{
add("NoNodeAvailableException");
add("ElasticsearchTimeoutException");
add("ReceiveTimeoutTransportException");
add("NodeClosedException");
}};
protected boolean isElastic;
public ElasticsearchError(Exception ex) {
super(ex);
isElastic = ELASTIC_ERROR_NAMES.contains(ex.getClass().getSimpleName());
}
public boolean isElastic() {
return isElastic;
}
}

View File

@ -19,17 +19,23 @@ package org.apache.nifi.elasticsearch;
import java.util.Map; import java.util.Map;
/**
* A POJO that represents an "operation on an index." It should not be confused with just indexing documents, as it
* covers all CRUD-related operations that can be executed against an Elasticsearch index with documents.
*/
public class IndexOperationRequest { public class IndexOperationRequest {
private String index; private String index;
private String type; private String type;
private String id; private String id;
private Map<String, Object> fields; private Map<String, Object> fields;
private Operation operation;
public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields) { public IndexOperationRequest(String index, String type, String id, Map<String, Object> fields, Operation operation) {
this.index = index; this.index = index;
this.type = type; this.type = type;
this.id = id; this.id = id;
this.fields = fields; this.fields = fields;
this.operation = operation;
} }
public String getIndex() { public String getIndex() {
@ -47,4 +53,25 @@ public class IndexOperationRequest {
public Map<String, Object> getFields() { public Map<String, Object> getFields() {
return fields; return fields;
} }
public Operation getOperation() {
return operation;
}
public enum Operation {
Create("create"),
Delete("delete"),
Index("index"),
Update("update"),
Upsert("upsert");
String value;
Operation(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}
} }

View File

@ -17,20 +17,44 @@
package org.apache.nifi.elasticsearch; package org.apache.nifi.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class IndexOperationResponse { public class IndexOperationResponse {
private long took; private long took;
private long ingestTook; private boolean hasErrors;
private List<Map<String, Object>> items;
public IndexOperationResponse(long took, long ingestTook) { public IndexOperationResponse(long took) {
this.took = took; this.took = took;
this.ingestTook = ingestTook;
} }
public long getTook() { public long getTook() {
return took; return took;
} }
public long getIngestTook() { public boolean hasErrors() {
return ingestTook; return hasErrors;
}
public static IndexOperationResponse fromJsonResponse(String response) throws IOException {
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> parsedResponse = mapper.readValue(response, Map.class);
int took = (int) parsedResponse.get("took");
boolean hasErrors = (boolean) parsedResponse.get("errors");
List<Map<String, Object>> items = (List<Map<String, Object>>)parsedResponse.get("items");
IndexOperationResponse retVal = new IndexOperationResponse(took);
retVal.hasErrors = hasErrors;
retVal.items = items;
return retVal;
}
public List<Map<String, Object>> getItems() {
return items;
} }
} }

View File

@ -25,6 +25,12 @@
<artifactId>nifi-elasticsearch-client-service</artifactId> <artifactId>nifi-elasticsearch-client-service</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<properties>
<es.int.version>5.6.15</es.int.version>
<script.name>setup-5.script</script.name>
<type.name>faketype</type.name>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
@ -160,25 +166,56 @@
<version>${nifi.groovy.version}</version> <version>${nifi.groovy.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>5.6.16</version>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
<profile>
<id>integration-6</id>
<properties>
<es.int.version>6.7.1</es.int.version>
<type.name>_doc</type.name>
<script.name>setup-6.script</script.name>
</properties>
</profile>
<profile>
<id>integration-7</id>
<properties>
<es.int.version>7.0.0</es.int.version>
<script.name>setup-7.script</script.name>
<type.name>_doc</type.name>
</properties>
</profile>
<profile> <profile>
<id>integration-tests</id> <id>integration-tests</id>
<build> <build>
<plugins> <plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M3</version>
<configuration>
<systemPropertyVariables>
<type_name>${type.name}</type_name>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin> <plugin>
<groupId>com.github.alexcojocaru</groupId> <groupId>com.github.alexcojocaru</groupId>
<artifactId>elasticsearch-maven-plugin</artifactId> <artifactId>elasticsearch-maven-plugin</artifactId>
<version>6.0</version> <version>6.13</version>
<configuration> <configuration>
<clusterName>testCluster</clusterName> <clusterName>testCluster</clusterName>
<transportPort>9500</transportPort> <transportPort>9500</transportPort>
<httpPort>9400</httpPort> <httpPort>9400</httpPort>
<version>5.6.2</version> <version>${es.int.version}</version>
<timeout>90</timeout> <timeout>90</timeout>
<logLevel>ERROR</logLevel> <logLevel>ERROR</logLevel>
<pathInitScript>${project.basedir}/src/test/resources/setup.script</pathInitScript> <pathInitScript>${project.basedir}/src/test/resources/${script.name}</pathInitScript>
</configuration> </configuration>
<executions> <executions>
<execution> <execution>

View File

@ -17,9 +17,9 @@
package org.apache.nifi.elasticsearch; package org.apache.nifi.elasticsearch;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity; import org.apache.http.HttpEntity;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope; import org.apache.http.auth.AuthScope;
@ -27,25 +27,21 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider; import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType; import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.nio.entity.NStringEntity; import org.apache.http.nio.entity.NStringEntity;
import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.elasticsearch.action.bulk.BulkRequest; import org.apache.nifi.util.StopWatch;
import org.elasticsearch.action.bulk.BulkResponse; import org.apache.nifi.util.StringUtils;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Response; import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import java.io.IOException; import java.io.IOException;
@ -59,6 +55,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService { public class ElasticSearchClientServiceImpl extends AbstractControllerService implements ElasticSearchClientService {
private ObjectMapper mapper = new ObjectMapper(); private ObjectMapper mapper = new ObjectMapper();
@ -66,7 +63,6 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
static final private List<PropertyDescriptor> properties; static final private List<PropertyDescriptor> properties;
private RestClient client; private RestClient client;
private RestHighLevelClient highLevelClient;
private String url; private String url;
private Charset charset; private Charset charset;
@ -158,10 +154,9 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
.setMaxRetryTimeoutMillis(retryTimeout); .setMaxRetryTimeoutMillis(retryTimeout);
this.client = builder.build(); this.client = builder.build();
this.highLevelClient = new RestHighLevelClient(client);
} }
private Response runQuery(String endpoint, String query, String index, String type) throws IOException { private Response runQuery(String endpoint, String query, String index, String type) {
StringBuilder sb = new StringBuilder() StringBuilder sb = new StringBuilder()
.append("/") .append("/")
.append(index); .append(index);
@ -174,12 +169,17 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON); HttpEntity queryEntity = new NStringEntity(query, ContentType.APPLICATION_JSON);
try {
return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity); return client.performRequest("POST", sb.toString(), Collections.emptyMap(), queryEntity);
} catch (Exception e) {
throw new ElasticsearchError(e);
}
} }
private Map<String, Object> parseResponse(Response response) throws IOException { private Map<String, Object> parseResponse(Response response) {
final int code = response.getStatusLine().getStatusCode(); final int code = response.getStatusLine().getStatusCode();
try {
if (code >= 200 & code < 300) { if (code >= 200 & code < 300) {
InputStream inputStream = response.getEntity().getContent(); InputStream inputStream = response.getEntity().getContent();
byte[] result = IOUtils.toByteArray(inputStream); byte[] result = IOUtils.toByteArray(inputStream);
@ -190,52 +190,132 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
response.getStatusLine().getReasonPhrase()); response.getStatusLine().getReasonPhrase());
throw new IOException(errorMessage); throw new IOException(errorMessage);
} }
} catch (Exception ex) {
throw new ElasticsearchError(ex);
}
} }
@Override @Override
public IndexOperationResponse add(IndexOperationRequest operation) throws IOException { public IndexOperationResponse add(IndexOperationRequest operation) {
return add(Arrays.asList(operation)); return bulk(Arrays.asList(operation));
}
private String flatten(String str) {
return str.replaceAll("[\\n\\r]", "\\\\n");
}
private String buildBulkHeader(IndexOperationRequest request) throws JsonProcessingException {
String operation = request.getOperation().equals(IndexOperationRequest.Operation.Upsert)
? "update"
: request.getOperation().getValue();
return buildBulkHeader(operation, request.getIndex(), request.getType(), request.getId());
}
private String buildBulkHeader(String operation, String index, String type, String id) throws JsonProcessingException {
Map<String, Object> header = new HashMap<String, Object>() {{
put(operation, new HashMap<String, Object>() {{
put("_index", index);
put("_id", id);
put("_type", type);
}});
}};
return flatten(mapper.writeValueAsString(header));
}
protected void buildRequest(IndexOperationRequest request, StringBuilder builder) throws JsonProcessingException {
String header = buildBulkHeader(request);
builder.append(header).append("\n");
if (request.getOperation().equals(IndexOperationRequest.Operation.Index)) {
String indexDocument = mapper.writeValueAsString(request.getFields());
builder.append(indexDocument).append("\n");
} else if (request.getOperation().equals(IndexOperationRequest.Operation.Update)
|| request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
Map<String, Object> doc = new HashMap<String, Object>() {{
put("doc", request.getFields());
if (request.getOperation().equals(IndexOperationRequest.Operation.Upsert)) {
put("doc_as_upsert", true);
}
}};
String update = flatten(mapper.writeValueAsString(doc)).trim();
builder.append(String.format("%s\n", update));
}
} }
@Override @Override
public IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException { public IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
BulkRequest bulkRequest = new BulkRequest(); try {
StringBuilder payload = new StringBuilder();
for (int index = 0; index < operations.size(); index++) { for (int index = 0; index < operations.size(); index++) {
IndexOperationRequest or = operations.get(index); IndexOperationRequest or = operations.get(index);
IndexRequest indexRequest = new IndexRequest(or.getIndex(), or.getType(), or.getId()) buildRequest(or, payload);
.source(or.getFields());
bulkRequest.add(indexRequest);
} }
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); if (getLogger().isDebugEnabled()) {
getLogger().debug(payload.toString());
}
HttpEntity entity = new NStringEntity(payload.toString(), ContentType.APPLICATION_JSON);
StopWatch watch = new StopWatch();
watch.start();
Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
watch.stop();
BulkResponse response = highLevelClient.bulk(bulkRequest); String rawResponse = IOUtils.toString(response.getEntity().getContent(), "UTF-8");
IndexOperationResponse retVal = new IndexOperationResponse(response.getTookInMillis(), response.getIngestTookInMillis());
if (getLogger().isDebugEnabled()) {
getLogger().debug(String.format("Response was: %s", rawResponse));
}
IndexOperationResponse retVal = IndexOperationResponse.fromJsonResponse(rawResponse);
return retVal; return retVal;
} catch (Exception ex) {
throw new ElasticsearchError(ex);
}
} }
@Override @Override
public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException { public Long count(String query, String index, String type) {
Response response = runQuery("_count", query, index, type);
Map<String, Object> parsed = parseResponse(response);
return ((Integer)parsed.get("count")).longValue();
}
@Override
public DeleteOperationResponse deleteById(String index, String type, String id) {
return deleteById(index, type, Arrays.asList(id)); return deleteById(index, type, Arrays.asList(id));
} }
@Override @Override
public DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException { public DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
BulkRequest bulk = new BulkRequest(); try {
StringBuilder sb = new StringBuilder();
for (int idx = 0; idx < ids.size(); idx++) { for (int idx = 0; idx < ids.size(); idx++) {
DeleteRequest request = new DeleteRequest(index, type, ids.get(idx)); String header = buildBulkHeader("delete", index, type, ids.get(idx));
bulk.add(request); sb.append(header).append("\n");
} }
BulkResponse response = highLevelClient.bulk(bulk); HttpEntity entity = new NStringEntity(sb.toString(), ContentType.APPLICATION_JSON);
StopWatch watch = new StopWatch();
watch.start();
Response response = client.performRequest("POST", "/_bulk", Collections.emptyMap(), entity);
watch.stop();
DeleteOperationResponse dor = new DeleteOperationResponse(response.getTookInMillis()); if (getLogger().isDebugEnabled()) {
getLogger().debug(String.format("Response for bulk delete: %s",
IOUtils.toString(response.getEntity().getContent(), "UTF-8")));
}
DeleteOperationResponse dor = new DeleteOperationResponse(watch.getDuration(TimeUnit.MILLISECONDS));
return dor; return dor;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} }
@Override @Override
public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException { public DeleteOperationResponse deleteByQuery(String query, String index, String type) {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
Response response = runQuery("_delete_by_query", query, index, type); Response response = runQuery("_delete_by_query", query, index, type);
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
@ -245,14 +325,40 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
} }
@Override @Override
public Map<String, Object> get(String index, String type, String id) throws IOException { public Map<String, Object> get(String index, String type, String id) {
GetRequest get = new GetRequest(index, type, id); try {
GetResponse resp = highLevelClient.get(get, new Header[]{}); StringBuilder endpoint = new StringBuilder();
return resp.getSource(); endpoint.append(index);
if (!StringUtils.isEmpty(type)) {
endpoint.append("/").append(type);
}
endpoint.append("/").append(id);
Response response = client.performRequest("GET", endpoint.toString(), new BasicHeader("Content-Type", "application/json"));
String body = IOUtils.toString(response.getEntity().getContent(), "UTF-8");
return (Map<String, Object>) mapper.readValue(body, Map.class).get("_source");
} catch (Exception ex) {
getLogger().error("", ex);
return null;
}
}
/*
* In pre-7.X ElasticSearch, it should return just a number. 7.X and after they are returning a map.
*/
private int handleSearchCount(Object raw) {
if (raw instanceof Number) {
return Integer.valueOf(raw.toString());
} else if (raw instanceof Map) {
return (Integer)((Map)raw).get("value");
} else {
throw new ProcessException("Unknown type for hit count.");
}
} }
@Override @Override
public SearchResponse search(String query, String index, String type) throws IOException { public SearchResponse search(String query, String index, String type) {
Response response = runQuery("_search", query, index, type); Response response = runQuery("_search", query, index, type);
Map<String, Object> parsed = parseResponse(response); Map<String, Object> parsed = parseResponse(response);
@ -261,7 +367,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
Map<String, Object> aggregations = parsed.get("aggregations") != null Map<String, Object> aggregations = parsed.get("aggregations") != null
? (Map<String, Object>)parsed.get("aggregations") : new HashMap<>(); ? (Map<String, Object>)parsed.get("aggregations") : new HashMap<>();
Map<String, Object> hitsParent = (Map<String, Object>)parsed.get("hits"); Map<String, Object> hitsParent = (Map<String, Object>)parsed.get("hits");
int count = (Integer)hitsParent.get("total"); int count = handleSearchCount(hitsParent.get("total"));
List<Map<String, Object>> hits = (List<Map<String, Object>>)hitsParent.get("hits"); List<Map<String, Object>> hits = (List<Map<String, Object>>)hitsParent.get("hits");
SearchResponse esr = new SearchResponse(hits, aggregations, count, took, timedOut); SearchResponse esr = new SearchResponse(hits, aggregations, count, took, timedOut);

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.elasticsearch
import org.junit.Assert
import org.junit.Test
class SearchResponseTest {
@Test
void test() {
def results = []
def aggs = [:]
def num = 10
def took = 100
def timeout = false
def response = new SearchResponse(results, aggs, num, took, timeout)
def str = response.toString()
Assert.assertEquals(results, response.hits)
Assert.assertEquals(aggs, response.aggregations)
Assert.assertEquals(num, response.numberOfHits)
Assert.assertEquals(took, response.took)
Assert.assertEquals(timeout, response.timedOut)
Assert.assertTrue(str.contains("aggregations"))
Assert.assertTrue(str.contains("hits"))
Assert.assertTrue(str.contains("numberOfHits"))
}
}

View File

@ -20,6 +20,8 @@ package org.apache.nifi.elasticsearch.integration
import org.apache.nifi.elasticsearch.DeleteOperationResponse import org.apache.nifi.elasticsearch.DeleteOperationResponse
import org.apache.nifi.elasticsearch.ElasticSearchClientService import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse import org.apache.nifi.elasticsearch.SearchResponse
import org.apache.nifi.ssl.StandardSSLContextService import org.apache.nifi.ssl.StandardSSLContextService
import org.apache.nifi.util.TestRunner import org.apache.nifi.util.TestRunner
@ -37,8 +39,8 @@ class ElasticSearch5ClientService_IT {
private TestRunner runner private TestRunner runner
private ElasticSearchClientServiceImpl service private ElasticSearchClientServiceImpl service
static final String INDEX = "messages" static String INDEX = "messages"
static final String TYPE = "message" static String TYPE = System.getProperty("type_name")
@Before @Before
void before() throws Exception { void before() throws Exception {
@ -80,7 +82,7 @@ class ElasticSearch5ClientService_IT {
])) ]))
SearchResponse response = service.search(query, "messages", "message") SearchResponse response = service.search(query, "messages", TYPE)
Assert.assertNotNull("Response was null", response) Assert.assertNotNull("Response was null", response)
Assert.assertEquals("Wrong count", 15, response.numberOfHits) Assert.assertEquals("Wrong count", 15, response.numberOfHits)
@ -138,6 +140,7 @@ class ElasticSearch5ClientService_IT {
@Test @Test
void testGet() throws IOException { void testGet() throws IOException {
Map old Map old
System.out.println("\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n" + "TYPE: " + TYPE + "\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n")
1.upto(15) { index -> 1.upto(15) { index ->
String id = String.valueOf(index) String id = String.valueOf(index)
def doc = service.get(INDEX, TYPE, id) def doc = service.get(INDEX, TYPE, id)
@ -171,4 +174,99 @@ class ElasticSearch5ClientService_IT {
runner.assertValid() runner.assertValid()
} }
@Test
void testBulkAddTwoIndexes() throws Exception {
List<IndexOperationRequest> payload = new ArrayList<>()
for (int x = 0; x < 20; x++) {
String index = x % 2 == 0 ? "bulk_a": "bulk_b"
payload.add(new IndexOperationRequest(index, TYPE, String.valueOf(x), new HashMap<String, Object>(){{
put("msg", "test")
}}, IndexOperationRequest.Operation.Index))
}
for (int x = 0; x < 5; x++) {
payload.add(new IndexOperationRequest("bulk_c", TYPE, String.valueOf(x), new HashMap<String, Object>(){{
put("msg", "test")
}}, IndexOperationRequest.Operation.Index))
}
IndexOperationResponse response = service.bulk(payload)
Assert.assertNotNull(response)
Assert.assertTrue(response.getTook() > 0)
Thread.sleep(2000)
/*
* Now, check to ensure that both indexes got populated appropriately.
*/
String query = "{ \"query\": { \"match_all\": {}}}"
Long indexA = service.count(query, "bulk_a", TYPE)
Long indexB = service.count(query, "bulk_b", TYPE)
Long indexC = service.count(query, "bulk_c", TYPE)
Assert.assertNotNull(indexA)
Assert.assertNotNull(indexB)
Assert.assertNotNull(indexC)
Assert.assertEquals(indexA, indexB)
Assert.assertEquals(10, indexA.intValue())
Assert.assertEquals(10, indexB.intValue())
Assert.assertEquals(5, indexC.intValue())
Long total = service.count(query, "bulk_*", TYPE)
Assert.assertNotNull(total)
Assert.assertEquals(25, total.intValue())
}
@Test
void testUpdateAndUpsert() {
final String TEST_ID = "update-test"
Map<String, Object> doc = new HashMap<>()
doc.put("msg", "Buongiorno, mondo")
service.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, doc, IndexOperationRequest.Operation.Index))
Map<String, Object> result = service.get(INDEX, TYPE, TEST_ID)
Assert.assertEquals("Not the same", doc, result)
Map<String, Object> updates = new HashMap<>()
updates.put("from", "john.smith")
Map<String, Object> merged = new HashMap<>()
merged.putAll(updates)
merged.putAll(doc)
IndexOperationRequest request = new IndexOperationRequest(INDEX, TYPE, TEST_ID, updates, IndexOperationRequest.Operation.Update)
service.add(request)
result = service.get(INDEX, TYPE, TEST_ID)
Assert.assertTrue(result.containsKey("from"))
Assert.assertTrue(result.containsKey("msg"))
Assert.assertEquals("Not the same after update.", merged, result)
final String UPSERTED_ID = "upsert-ftw"
Map<String, Object> upsertItems = new HashMap<>()
upsertItems.put("upsert_1", "hello")
upsertItems.put("upsert_2", 1)
upsertItems.put("upsert_3", true)
request = new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, upsertItems, IndexOperationRequest.Operation.Upsert)
service.add(request)
result = service.get(INDEX, TYPE, UPSERTED_ID)
Assert.assertEquals(upsertItems, result)
List<IndexOperationRequest> deletes = new ArrayList<>()
deletes.add(new IndexOperationRequest(INDEX, TYPE, TEST_ID, null, IndexOperationRequest.Operation.Delete))
deletes.add(new IndexOperationRequest(INDEX, TYPE, UPSERTED_ID, null, IndexOperationRequest.Operation.Delete))
service.bulk(deletes)
Assert.assertNull(service.get(INDEX, TYPE, TEST_ID))
Assert.assertNull(service.get(INDEX, TYPE, UPSERTED_ID))
}
@Test
void testGetBulkResponsesWithErrors() {
def ops = [
new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Index),
new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Create),
new IndexOperationRequest(INDEX, TYPE, "2", [ "msg": "Hi", intField: 1], IndexOperationRequest.Operation.Create),
new IndexOperationRequest(INDEX, TYPE, "1", [ "msg": "Hi", intField: "notaninteger"], IndexOperationRequest.Operation.Index)
]
def response = service.bulk(ops)
assert response.hasErrors()
assert response.items.findAll {
def key = it.keySet().stream().findFirst().get()
it[key].containsKey("error")
}.size() == 2
}
} }

View File

@ -39,6 +39,8 @@ class ElasticSearchLookupService_IT {
private ElasticSearchClientService service private ElasticSearchClientService service
private ElasticSearchLookupService lookupService private ElasticSearchLookupService lookupService
static String TYPE = System.getProperty("type_name")
@Before @Before
void before() throws Exception { void before() throws Exception {
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class) runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
@ -54,7 +56,7 @@ class ElasticSearchLookupService_IT {
runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service") runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service")
runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service") runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service")
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details") runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details")
runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "details") runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
try { try {
runner.enableControllerService(service) runner.enableControllerService(service)
@ -141,7 +143,7 @@ class ElasticSearchLookupService_IT {
runner.disableControllerService(lookupService) runner.disableControllerService(lookupService)
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested") runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "nested_complex") runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
runner.enableControllerService(lookupService) runner.enableControllerService(lookupService)
Optional<Record> response = lookupService.lookup(coordinates) Optional<Record> response = lookupService.lookup(coordinates)
@ -162,7 +164,7 @@ class ElasticSearchLookupService_IT {
void testDetectedSchema() throws LookupFailureException { void testDetectedSchema() throws LookupFailureException {
runner.disableControllerService(lookupService) runner.disableControllerService(lookupService)
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex") runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex")
runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "complex") runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
runner.enableControllerService(lookupService) runner.enableControllerService(lookupService)
def coordinates = ["_id": "1" ] def coordinates = ["_id": "1" ]
@ -196,7 +198,7 @@ class ElasticSearchLookupService_IT {
runner.setProperty(lookupService, "\$.subField.longField", "/longField2") runner.setProperty(lookupService, "\$.subField.longField", "/longField2")
runner.setProperty(lookupService, '$.subField.dateField', '/dateField2') runner.setProperty(lookupService, '$.subField.dateField', '/dateField2')
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested") runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, "nested_complex") runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, TYPE)
runner.enableControllerService(lookupService) runner.enableControllerService(lookupService)
def coordinates = ["msg": "Hello, world"] def coordinates = ["msg": "Hello, world"]

View File

@ -33,37 +33,42 @@ class TestElasticSearchClientService extends AbstractControllerService implement
] ]
@Override @Override
IndexOperationResponse add(IndexOperationRequest operation) throws IOException { IndexOperationResponse add(IndexOperationRequest operation) {
return null return null
} }
@Override @Override
IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException { IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
return null return null
} }
@Override @Override
DeleteOperationResponse deleteById(String index, String type, String id) throws IOException { Long count(String query, String index, String type) {
return null return null
} }
@Override @Override
DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException { DeleteOperationResponse deleteById(String index, String type, String id) {
return null return null
} }
@Override @Override
DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException { DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
return null return null
} }
@Override @Override
Map<String, Object> get(String index, String type, String id) throws IOException { DeleteOperationResponse deleteByQuery(String query, String index, String type) {
return null
}
@Override
Map<String, Object> get(String index, String type, String id) {
return data return data
} }
@Override @Override
SearchResponse search(String query, String index, String type) throws IOException { SearchResponse search(String query, String index, String type) {
List hits = [[ List hits = [[
"_source": data "_source": data
]] ]]

View File

@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#create mapping
PUT:user_details/:{ "mappings":{"faketype":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
PUT:messages/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:complex/:{"mappings":{"faketype":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
PUT:nested/:{"mappings":{"faketype":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
PUT:bulk_a/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_b/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_c/:{ "mappings":{"faketype":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:error_handler:{ "mappings": { "faketype": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
#add document
PUT:messages/faketype/1:{ "msg":"one" }
PUT:messages/faketype/2:{ "msg":"two" }
PUT:messages/faketype/3:{ "msg":"two" }
PUT:messages/faketype/4:{ "msg":"three" }
PUT:messages/faketype/5:{ "msg":"three" }
PUT:messages/faketype/6:{ "msg":"three" }
PUT:messages/faketype/7:{ "msg":"four" }
PUT:messages/faketype/8:{ "msg":"four" }
PUT:messages/faketype/9:{ "msg":"four" }
PUT:messages/faketype/10:{ "msg":"four" }
PUT:messages/faketype/11:{ "msg":"five" }
PUT:messages/faketype/12:{ "msg":"five" }
PUT:messages/faketype/13:{ "msg":"five" }
PUT:messages/faketype/14:{ "msg":"five" }
PUT:messages/faketype/15:{ "msg":"five" }
PUT:complex/faketype/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
PUT:user_details/faketype/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
PUT:user_details/faketype/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
PUT:nested/faketype/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
PUT:nested/faketype/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
PUT:nested/faketype/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}

View File

@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#create mapping
PUT:user_details/:{ "mappings":{"_doc":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
PUT:messages/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:complex/:{"mappings":{"_doc":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
PUT:nested/:{"mappings":{"_doc":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
PUT:bulk_a/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_b/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:bulk_c/:{ "mappings":{"_doc":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:error_handler:{ "mappings": { "_doc": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}}
#add document
PUT:messages/_doc/1:{ "msg":"one" }
PUT:messages/_doc/2:{ "msg":"two" }
PUT:messages/_doc/3:{ "msg":"two" }
PUT:messages/_doc/4:{ "msg":"three" }
PUT:messages/_doc/5:{ "msg":"three" }
PUT:messages/_doc/6:{ "msg":"three" }
PUT:messages/_doc/7:{ "msg":"four" }
PUT:messages/_doc/8:{ "msg":"four" }
PUT:messages/_doc/9:{ "msg":"four" }
PUT:messages/_doc/10:{ "msg":"four" }
PUT:messages/_doc/11:{ "msg":"five" }
PUT:messages/_doc/12:{ "msg":"five" }
PUT:messages/_doc/13:{ "msg":"five" }
PUT:messages/_doc/14:{ "msg":"five" }
PUT:messages/_doc/15:{ "msg":"five" }
PUT:complex/_doc/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
PUT:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
PUT:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
PUT:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
PUT:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
PUT:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}

View File

@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#create mapping
PUT:user_details/:{ "mappings":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}
PUT:messages/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:complex/:{"mappings":{ "properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}
PUT:nested/:{"mappings":{ "properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}
PUT:bulk_a/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_b/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:bulk_c/:{ "mappings":{ "properties":{ "msg":{"type":"keyword"}}}}
PUT:error_handler:{ "mappings": { "properties": { "msg": { "type": "keyword" }, "intField": { "type": "integer" }}}}
#add document
POST:messages/_doc/1:{ "msg":"one" }
POST:messages/_doc/2:{ "msg":"two" }
POST:messages/_doc/3:{ "msg":"two" }
POST:messages/_doc/4:{ "msg":"three" }
POST:messages/_doc/5:{ "msg":"three" }
POST:messages/_doc/6:{ "msg":"three" }
POST:messages/_doc/7:{ "msg":"four" }
POST:messages/_doc/8:{ "msg":"four" }
POST:messages/_doc/9:{ "msg":"four" }
POST:messages/_doc/10:{ "msg":"four" }
POST:messages/_doc/11:{ "msg":"five" }
POST:messages/_doc/12:{ "msg":"five" }
POST:messages/_doc/13:{ "msg":"five" }
POST:messages/_doc/14:{ "msg":"five" }
POST:messages/_doc/15:{ "msg":"five" }
POST:complex/_doc/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
POST:user_details/_doc/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
POST:user_details/_doc/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
POST:nested/_doc/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
POST:nested/_doc/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
POST:nested/_doc/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}

View File

@ -1,41 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#create mapping
PUT:user_details/:{ "mappings":{"details":{ "properties":{ "email":{"type":"keyword"},"phone":{"type": "keyword"},"accessKey":{"type": "keyword"}}}}}
PUT:messages/:{ "mappings":{"message":{ "properties":{ "msg":{"type":"keyword"}}}}}
PUT:complex/:{"mappings":{"complex":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"}}}}}}}
PUT:nested/:{"mappings":{"nested_complex":{"properties":{"msg":{"type":"keyword"},"subField":{"type":"nested","properties":{"longField":{"type":"long"},"dateField":{"type":"date"},"deeper":{"type":"nested","properties":{"secretz":{"type":"keyword"},"deepest":{"type":"nested","properties":{"super_secret":{"type":"keyword"}}}}}}}}}}}
#add document
PUT:messages/message/1:{ "msg":"one" }
PUT:messages/message/2:{ "msg":"two" }
PUT:messages/message/3:{ "msg":"two" }
PUT:messages/message/4:{ "msg":"three" }
PUT:messages/message/5:{ "msg":"three" }
PUT:messages/message/6:{ "msg":"three" }
PUT:messages/message/7:{ "msg":"four" }
PUT:messages/message/8:{ "msg":"four" }
PUT:messages/message/9:{ "msg":"four" }
PUT:messages/message/10:{ "msg":"four" }
PUT:messages/message/11:{ "msg":"five" }
PUT:messages/message/12:{ "msg":"five" }
PUT:messages/message/13:{ "msg":"five" }
PUT:messages/message/14:{ "msg":"five" }
PUT:messages/message/15:{ "msg":"five" }
PUT:complex/complex/1:{"msg":"Hello, world","subField":{"longField":100000,"dateField":"2018-04-10T12:18:05Z"}}
PUT:user_details/details/1:{ "email": "john.smith@company.com", "phone": "123-456-7890", "accessKey": "ABCDE"}
PUT:user_details/details/2:{ "email": "jane.doe@company.com", "phone": "098-765-4321", "accessKey": "GHIJK"}
PUT:nested/nested_complex/1:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"No one should see this!","deepest":{"super_secret":"Got nothin to hide"}}}}
PUT:nested/nested_complex/2:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Hello, world!","deepest":{"super_secret":"I could tell, but then I would have to kill you"}}}}
PUT:nested/nested_complex/3:{"msg":"Hello, world","subField":{"longField":150000,"dateField":"2018-08-14T10:08:00Z","deeper":{"secretz":"Buongiorno, mondo!!","deepest":{"super_secret":"The sky is blue"}}}}

View File

@ -56,6 +56,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-ssl-context-service-api</artifactId> <artifactId>nifi-ssl-context-service-api</artifactId>
<version>1.11.0-SNAPSHOT</version> <version>1.11.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>commons-io</groupId> <groupId>commons-io</groupId>
<artifactId>commons-io</artifactId> <artifactId>commons-io</artifactId>
@ -94,6 +99,46 @@ language governing permissions and limitations under the License. -->
<version>1.11.0-SNAPSHOT</version> <version>1.11.0-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-path</artifactId>
<version>1.11.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
<version>${nifi.groovy.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -43,14 +43,14 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@CapabilityDescription("Delete from an ElasticSearch index using a query. The query can be loaded from a flowfile body " + @CapabilityDescription("Delete from an Elasticsearch index using a query. The query can be loaded from a flowfile body " +
"or from the Query parameter.") "or from the Query parameter.")
@Tags({ "elastic", "elasticsearch", "delete", "query"}) @Tags({ "elastic", "elasticsearch", "delete", "query"})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."), @WritesAttribute(attribute = "elasticsearch.delete.took", description = "The amount of time that it took to complete the delete operation in ms."),
@WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by ElasticSearch if there is an error running the delete.") @WritesAttribute(attribute = "elasticsearch.delete.error", description = "The error message provided by Elasticsearch if there is an error running the delete.")
}) })
public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticSearchRestProcessor { public class DeleteByQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("If the delete by query fails, and a flowfile was read, it will be sent to this relationship.").build(); .description("If the delete by query fails, and a flowfile was read, it will be sent to this relationship.").build();

View File

@ -25,12 +25,15 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.JsonValidator; import org.apache.nifi.processor.util.JsonValidator;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
public interface ElasticSearchRestProcessor { public interface ElasticsearchRestProcessor {
String ATTR_RECORD_COUNT = "record.count";
PropertyDescriptor INDEX = new PropertyDescriptor.Builder() PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("el-rest-fetch-index") .name("el-rest-fetch-index")
.displayName("Index") .displayName("Index")
@ -43,7 +46,7 @@ public interface ElasticSearchRestProcessor {
PropertyDescriptor TYPE = new PropertyDescriptor.Builder() PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("el-rest-type") .name("el-rest-type")
.displayName("Type") .displayName("Type")
.description("The type of this document (used by Elasticsearch for indexing and searching)") .description("The type of this document (used by Elasticsearch for indexing and searching).")
.required(false) .required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@ -70,11 +73,39 @@ public interface ElasticSearchRestProcessor {
PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder() PropertyDescriptor CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("el-rest-client-service") .name("el-rest-client-service")
.displayName("Client Service") .displayName("Client Service")
.description("An ElasticSearch client service to use for running queries.") .description("An Elasticsearch client service to use for running queries.")
.identifiesControllerService(ElasticSearchClientService.class) .identifiesControllerService(ElasticSearchClientService.class)
.required(true) .required(true)
.build(); .build();
PropertyDescriptor LOG_ERROR_RESPONSES = new PropertyDescriptor.Builder()
.name("put-es-record-log-error-responses")
.displayName("Log Error Responses")
.description("If this is enabled, errors will be logged to the NiFi logs at the error log level. Otherwise, they will " +
"only be logged if debug logging is enabled on NiFi as a whole. The purpose of this option is to give the user " +
"the ability to debug failed operations without having to turn on debug logging.")
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("All flowfiles that fail for reasons unrelated to server availability go to this relationship.")
.build();
Relationship REL_RETRY = new Relationship.Builder()
.name("retry")
.description("All flowfiles that fail due to server/cluster availability go to this relationship.")
.build();
Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All flowfiles that succeed in being transferred into Elasticsearch go here.")
.build();
Relationship REL_FAILED_RECORDS = new Relationship.Builder()
.name("errors").description("If an output record write is set, any record that failed to process the way it was " +
"configured will be sent to this relationship as part of a failed record record set.")
.autoTerminateDefault(true).build();
default String getQuery(FlowFile input, ProcessContext context, ProcessSession session) throws IOException { default String getQuery(FlowFile input, ProcessContext context, ProcessSession session) throws IOException {
String retVal = null; String retVal = null;
if (context.getProperty(QUERY).isSet()) { if (context.getProperty(QUERY).isSet()) {

View File

@ -56,18 +56,15 @@ import java.util.Set;
@EventDriven @EventDriven
@Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"}) @Tags({"elasticsearch", "elasticsearch 5", "query", "read", "get", "json"})
@CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " + @CapabilityDescription("A processor that allows the user to run a query (with aggregations) written with the " +
"ElasticSearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " + "Elasticsearch JSON DSL. It does not automatically paginate queries for the user. If an incoming relationship is added to this " +
"processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " + "processor, it will use the flowfile's content for the query. Care should be taken on the size of the query because the entire response " +
"from ElasticSearch will be loaded into memory all at once and converted into the resulting flowfiles.") "from Elasticsearch will be loaded into memory all at once and converted into the resulting flowfiles.")
public class JsonQueryElasticsearch extends AbstractProcessor implements ElasticSearchRestProcessor { public class JsonQueryElasticsearch extends AbstractProcessor implements ElasticsearchRestProcessor {
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
.description("All original flowfiles that don't cause an error to occur go to this relationship. " + .description("All original flowfiles that don't cause an error to occur go to this relationship. " +
"This applies even if you select the \"split up hits\" option to send individual hits to the " + "This applies even if you select the \"split up hits\" option to send individual hits to the " +
"\"hits\" relationship.").build(); "\"hits\" relationship.").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship").build();
public static final Relationship REL_HITS = new Relationship.Builder().name("hits") public static final Relationship REL_HITS = new Relationship.Builder().name("hits")
.description("Search hits are routed to this relationship.") .description("Search hits are routed to this relationship.")
.build(); .build();

View File

@ -0,0 +1,355 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.elasticsearch.ElasticSearchClientService;
import org.apache.nifi.elasticsearch.ElasticsearchError;
import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.elasticsearch.IndexOperationResponse;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.elasticsearch.api.BulkOperation;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.util.StringUtils;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", "put", "index", "record"})
@CapabilityDescription("A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries.")
public class PutElasticsearchRecord extends AbstractProcessor implements ElasticsearchRestProcessor {
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("put-es-record-reader")
.displayName("Record Reader")
.description("The record reader to use for reading incoming records from flowfiles.")
.identifiesControllerService(RecordReaderFactory.class)
.required(true)
.build();
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("put-es-record-batch-size")
.displayName("Batch Size")
.description("The number of records to send over in a single batch.")
.defaultValue("100")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(true)
.build();
static final PropertyDescriptor ID_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-id-path")
.displayName("ID Record Path")
.description("A record path expression to retrieve the ID field for use with Elasticsearch. If left blank " +
"the ID will be automatically generated by Elasticsearch.")
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor INDEX_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-index-record-path")
.displayName("Index Record Path")
.description("A record path expression to retrieve the index field for use with Elasticsearch. If left blank " +
"the index will be determined using the main index property.")
.addValidator(new RecordPathValidator())
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor TYPE_RECORD_PATH = new PropertyDescriptor.Builder()
.name("put-es-record-type-record-path")
.displayName("Type Record Path")
.description("A record path expression to retrieve the type field for use with Elasticsearch. If left blank " +
"the type will be determined using the main type property.")
.addValidator(new RecordPathValidator())
.required(false)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor ERROR_RECORD_WRITER = new PropertyDescriptor.Builder()
.name("put-es-record-error-writer")
.displayName("Error Record Writer")
.description("If this configuration property is set, the response from Elasticsearch will be examined for failed records " +
"and the failed records will be written to a record set with this record writer service and sent to the \"errors\" " +
"relationship.")
.identifiesControllerService(RecordSetWriterFactory.class)
.addValidator(Validator.VALID)
.required(false)
.build();
static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(
INDEX, TYPE, CLIENT_SERVICE, RECORD_READER, BATCH_SIZE, ID_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH,
LOG_ERROR_RESPONSES, ERROR_RECORD_WRITER
));
static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS
)));
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return DESCRIPTORS;
}
private RecordReaderFactory readerFactory;
private RecordPathCache recordPathCache;
private ElasticSearchClientService clientService;
private RecordSetWriterFactory writerFactory;
private boolean logErrors;
@OnScheduled
public void onScheduled(ProcessContext context) {
this.readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
this.clientService = context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class);
this.recordPathCache = new RecordPathCache(16);
this.writerFactory = context.getProperty(ERROR_RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile input = session.get();
if (input == null) {
return;
}
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(input).getValue();
final String type = context.getProperty(TYPE).evaluateAttributeExpressions(input).getValue();
final String idPath = context.getProperty(ID_RECORD_PATH).isSet()
? context.getProperty(ID_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
: null;
final String indexPath = context.getProperty(INDEX_RECORD_PATH).isSet()
? context.getProperty(INDEX_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
: null;
final String typePath = context.getProperty(TYPE_RECORD_PATH).isSet()
? context.getProperty(TYPE_RECORD_PATH).evaluateAttributeExpressions(input).getValue()
: null;
RecordPath path = idPath != null ? recordPathCache.getCompiled(idPath) : null;
RecordPath iPath = indexPath != null ? recordPathCache.getCompiled(indexPath) : null;
RecordPath tPath = typePath != null ? recordPathCache.getCompiled(typePath) : null;
int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(input).asInteger();
List<FlowFile> badRecords = new ArrayList<>();
try (final InputStream inStream = session.read(input);
final RecordReader reader = readerFactory.createRecordReader(input, inStream, getLogger())) {
Record record;
List<IndexOperationRequest> operationList = new ArrayList<>();
List<Record> originals = new ArrayList<>();
while ((record = reader.nextRecord()) != null) {
final String idx = getFromRecordPath(record, iPath, index);
final String t = getFromRecordPath(record, tPath, type);
final IndexOperationRequest.Operation o = IndexOperationRequest.Operation.Index;
final String id = path != null ? getFromRecordPath(record, path, null) : null;
Map<String, Object> contentMap = (Map<String, Object>) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
removeEmpty(contentMap);
operationList.add(new IndexOperationRequest(idx, t, id, contentMap, o));
originals.add(record);
if (operationList.size() == batchSize) {
BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
FlowFile bad = indexDocuments(bundle, session, input);
if (bad != null) {
badRecords.add(bad);
}
operationList.clear();
originals.clear();
}
}
if (operationList.size() > 0) {
BulkOperation bundle = new BulkOperation(operationList, originals, reader.getSchema());
FlowFile bad = indexDocuments(bundle, session, input);
if (bad != null) {
badRecords.add(bad);
}
}
session.transfer(input, REL_SUCCESS);
} catch (ElasticsearchError ese) {
String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Moving to retry." : "Moving to failure");
getLogger().error(msg, ese);
Relationship rel = ese.isElastic() ? REL_RETRY : REL_FAILURE;
session.penalize(input);
session.transfer(input, rel);
removeBadRecordFlowFiles(badRecords, session);
} catch (Exception ex) {
getLogger().error("Could not index documents.", ex);
session.transfer(input, REL_FAILURE);
removeBadRecordFlowFiles(badRecords, session);
}
}
private void removeBadRecordFlowFiles(List<FlowFile> bad, ProcessSession session) {
for (FlowFile badFlowFile : bad) {
session.remove(badFlowFile);
}
bad.clear();
}
private FlowFile indexDocuments(BulkOperation bundle, ProcessSession session, FlowFile input) throws Exception {
IndexOperationResponse response = clientService.bulk(bundle.getOperationList());
if (response.hasErrors()) {
if(logErrors || getLogger().isDebugEnabled()) {
List<Map<String, Object>> errors = response.getItems();
ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationFeature.INDENT_OUTPUT);
String output = String.format("An error was encountered while processing bulk operations. Server response below:\n\n%s", mapper.writeValueAsString(errors));
if (logErrors) {
getLogger().error(output);
} else {
getLogger().debug(output);
}
}
if (writerFactory != null) {
FlowFile errorFF = session.create(input);
try (OutputStream os = session.write(errorFF);
RecordSetWriter writer = writerFactory.createWriter(getLogger(), bundle.getSchema(), os )) {
int added = 0;
writer.beginRecordSet();
for (int index = 0; index < response.getItems().size(); index++) {
Map<String, Object> current = response.getItems().get(index);
String key = current.keySet().stream().findFirst().get();
Map<String, Object> inner = (Map<String, Object>) current.get(key);
if (inner.containsKey("error")) {
writer.write(bundle.getOriginalRecords().get(index));
added++;
}
}
writer.finishRecordSet();
writer.close();
os.close();
errorFF = session.putAttribute(errorFF, ATTR_RECORD_COUNT, String.valueOf(added));
session.transfer(errorFF, REL_FAILED_RECORDS);
return errorFF;
} catch (Exception ex) {
getLogger().error("", ex);
session.remove(errorFF);
throw ex;
}
}
return null;
} else {
return null;
}
}
private void removeEmpty(Map<String, Object> input) {
Map<String, Object> copy = new HashMap<>(input);
for (Map.Entry<String, Object> entry : input.entrySet()) {
if (entry.getValue() == null) {
copy.remove(entry.getKey());
} else {
if (StringUtils.isBlank(entry.getValue().toString())) {
copy.remove(entry.getKey());
} else if (entry.getValue() instanceof Map) {
removeEmpty((Map<String, Object>) entry.getValue());
} else if (entry.getValue() instanceof List) {
for (Object value : (List)entry.getValue()) {
if (value instanceof Map) {
removeEmpty((Map<String, Object>) value);
}
}
}
}
}
input.clear();
input.putAll(copy);
}
private String getFromRecordPath(Record record, RecordPath path, final String fallback) {
if (path == null) {
return fallback;
}
RecordPathResult result = path.evaluate(record);
Optional<FieldValue> value = result.getSelectedFields().findFirst();
if (value.isPresent() && value.get().getValue() != null) {
FieldValue fieldValue = value.get();
if (!fieldValue.getField().getDataType().getFieldType().equals(RecordFieldType.STRING) ) {
throw new ProcessException(
String.format("Field referenced by %s must be a string.", path.getPath())
);
}
fieldValue.updateValue(null);
String retVal = fieldValue.getValue().toString();
return retVal;
} else {
return fallback;
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.api;
import org.apache.nifi.elasticsearch.IndexOperationRequest;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import java.util.List;
public class BulkOperation {
private List<IndexOperationRequest> operationList;
private List<Record> originalRecords;
private RecordSchema schema;
public BulkOperation(List<IndexOperationRequest> operationList, List<Record> originalRecords, RecordSchema schema) {
this.operationList = operationList;
this.originalRecords = originalRecords;
this.schema = schema;
}
public List<IndexOperationRequest> getOperationList() {
return operationList;
}
public List<Record> getOriginalRecords() {
return originalRecords;
}
public RecordSchema getSchema() {
return schema;
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.put;
import com.jayway.jsonpath.JsonPath;
public class FlowFileJsonDescription {
private String index;
private String type;
private String id;
private String content;
private JsonPath idJsonPath;
private JsonPath typeJsonPath;
private JsonPath indexJsonPath;
public FlowFileJsonDescription(String index, String type, String id, String content, JsonPath idJsonPath, JsonPath typeJsonPath, JsonPath indexJsonPath) {
this.index = index;
this.type = type;
this.id = id;
this.content = content;
this.idJsonPath = idJsonPath;
this.typeJsonPath = typeJsonPath;
this.indexJsonPath = indexJsonPath;
}
public String getIndex() {
return index;
}
public String getType() {
return type;
}
public String getId() {
return id;
}
public String getContent() {
return content;
}
public JsonPath getIdJsonPath() {
return idJsonPath;
}
public JsonPath getTypeJsonPath() {
return typeJsonPath;
}
public JsonPath getIndexJsonPath() {
return indexJsonPath;
}
}

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.put;
public class JsonProcessingError extends Exception {
public JsonProcessingError(String message) {
super(message);
}
}

View File

@ -0,0 +1,48 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutElasticsearchJson</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>This processor is designed to take any valid JSON and index it into Elasticsearch 5.x or newer. Unlike the record-aware
alternative, it only does index operations, not updates, upserts or deletes.</p>
<p>The ID for each document can be set in one of three ways:</p>
<ul>
<li>FlowFile attribute - only possible if the flowfile contains only a single document. Arrays of documents
will cause an error to be raised.</li>
<li>JsonPath operation - searches within the document for a particular field.</li>
<li>Default - Elasticsearch will generate one. This will be used when neither the attribute nor JsonPath configuration
fields are used.</li>
</ul>
<p>Example document:</p>
<pre>
{
"index": "messages",
"type": "message",
"message": "Hello, world",
"from": "john.smith"
}
</pre>
<p>The following JsonPath operations will extract the index and type:</p>
<ul>
<li>$.index</li>
<li>$.type</li>
</ul>
</body>
</html>

View File

@ -0,0 +1,64 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutElasticsearchRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>This is a record-aware processor built for Elasticsearch 5 and later. The index and type fields are configured
with default values that can be overridden using record path operations that find an index or type value in the
record set. The ID and operation type (index, update, upsert or delete) can also be extracted in a similar fashion from
the record set. The following is an example of a document exercising all of these features:</p>
<pre>
{
"metadata": {
"id": "12345",
"index": "test",
"type": "message",
"operation": "index"
},
"message": "Hello, world",
"from": "john.smith"
}
</pre>
<pre>
{
"metadata": {
"id": "12345",
"index": "test",
"type": "message",
"operation": "delete"
}
}
</pre>
<p>The record path operations below would extract the relevant data:</p>
<ul>
<li>/metadata/id</li>
<li>/metadata/index</li>
<li>metadata/type</li>
<li>metadata/operation</li>
</ul>
<p>Valid values for "operation" are:</p>
<ul>
<li>delete</li>
<li>index</li>
<li>update</li>
<li>upsert</li>
</ul>
</body>
</html>

View File

@ -15,3 +15,4 @@
org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch org.apache.nifi.processors.elasticsearch.DeleteByQueryElasticsearch
org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch org.apache.nifi.processors.elasticsearch.JsonQueryElasticsearch
org.apache.nifi.processors.elasticsearch.PutElasticsearchRecord

View File

@ -21,7 +21,7 @@
</head> </head>
<body> <body>
<p>This processor executes a delete operation against one or more indices using the _delete_by_query handler. The <p>This processor executes a delete operation against one or more indices using the _delete_by_query handler. The
query should be a valid ElasticSearch JSON DSL query (Lucene syntax is not supported). An example query:</p> query should be a valid Elasticsearch JSON DSL query (Lucene syntax is not supported). An example query:</p>
<pre> <pre>
{ {
"query": { "query": {

View File

@ -20,8 +20,8 @@
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" /> <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head> </head>
<body> <body>
<p>This processor is intended for use with the ElasticSearch JSON DSL and ElasticSearch 5.X and newer. It is designed <p>This processor is intended for use with the Elasticsearch JSON DSL and Elasticsearch 5.X and newer. It is designed
to be able to take a query from Kibana and execute it as-is against an ElasticSearch cluster. Like all processors in the to be able to take a query from Kibana and execute it as-is against an Elasticsearch cluster. Like all processors in the
"restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p> "restapi" bundle, it uses the official Elastic client APIs, so it supports leader detection.</p>
<p>The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In <p>The query to execute can be provided either in the Query configuration property or in an attribute on a flowfile. In
the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query Attribute the latter case, the name of the attribute (Expression Language is supported here) must be provided in the Query Attribute

View File

@ -0,0 +1,41 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutElasticsearchRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>
This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on
a per-record basis which is what separates it from PutElasticsearchHttpRecord. For example, it is possible to define
multiple commands to index documents, followed by deletes, creates and update operations against the same index or
other indices as desired.
</p>
<p>
As part of the Elasticsearch REST API bundle, it uses a controller service to manage connection information and
that controller service is built on top of the official Elasticsearch client APIs. That provides features such as
automatic master detection against the cluster which is missing in the other bundles.
</p>
<p>
This processor builds one Elasticsearch Bulk API body per record set. Care should be taken to split up record sets
into appropriately-sized chunks so that NiFi does not run out of memory and the requests sent to Elasticsearch are
not too large for it to handle. When failures do occur, this processor is capable of attempting to write the records
that failed to an output record writer so that only failed records can be processed downstream or replayed.
</p>
</body>
</html>

View File

@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.Assert
import org.junit.Test
class DeleteByQueryElasticsearchTest {
private static final String INDEX = "test_idx"
private static final String TYPE = "test_type"
private static final String QUERY_ATTR = "es.delete.query"
private static final String CLIENT_NAME = "clientService"
private TestElasticsearchClientService client
private void initClient(TestRunner runner) throws Exception {
client = new TestElasticsearchClientService(true)
runner.addControllerService(CLIENT_NAME, client)
runner.enableControllerService(client)
runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME)
}
private void postTest(TestRunner runner, String queryParam) {
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0)
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1)
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS)
String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE)
String query = flowFiles.get(0).getAttribute(QUERY_ATTR)
Assert.assertNotNull(attr)
Assert.assertEquals(attr, "100")
Assert.assertNotNull(query)
Assert.assertEquals(queryParam, query)
}
@Test
void testWithFlowfileInput() throws Exception {
String query = "{ \"query\": { \"match_all\": {} }}"
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
initClient(runner)
runner.assertValid()
runner.enqueue(query)
runner.run()
postTest(runner, query)
}
@Test
void testWithQuery() throws Exception {
String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"\${field.name}.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}"
Map<String, String> attrs = new HashMap<String, String>(){{
put("field.name", "test_field")
}}
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
initClient(runner)
runner.assertValid()
runner.enqueue("", attrs)
runner.run()
postTest(runner, query.replace('${field.name}', "test_field"))
runner.clearTransferState()
query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"test_field.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}"
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
runner.setIncomingConnection(false)
runner.assertValid()
runner.run()
postTest(runner, query)
}
@Test
void testErrorAttribute() throws Exception {
String query = "{ \"query\": { \"match_all\": {} }}"
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
initClient(runner)
client.setThrowErrorInDelete(true)
runner.assertValid()
runner.enqueue("")
runner.run()
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0)
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1)
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0)
String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE)
Assert.assertNotNull(attr)
}
@Test
void testInputHandling() {
String query = "{ \"query\": { \"match_all\": {} }}"
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class)
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query)
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX)
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE)
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR)
initClient(runner)
runner.assertValid()
runner.run()
}
}

View File

@ -3,7 +3,7 @@
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
@ -15,54 +15,54 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.elasticsearch; package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners
import org.junit.Assert; import org.junit.Assert
import org.junit.Test; import org.junit.Test
import java.util.List; import java.util.List
public class JsonQueryElasticsearchTest { class JsonQueryElasticsearchTest {
private static final String INDEX_NAME = "messages"; private static final String INDEX_NAME = "messages"
public void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) { void testCounts(TestRunner runner, int success, int hits, int failure, int aggregations) {
runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success); runner.assertTransferCount(JsonQueryElasticsearch.REL_ORIGINAL, success)
runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits); runner.assertTransferCount(JsonQueryElasticsearch.REL_HITS, hits)
runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure); runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, failure)
runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations); runner.assertTransferCount(JsonQueryElasticsearch.REL_AGGREGATIONS, aggregations)
} }
@Test @Test
public void testBasicQuery() throws Exception { void testBasicQuery() throws Exception {
JsonQueryElasticsearch processor = new JsonQueryElasticsearch(); JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
TestRunner runner = TestRunners.newTestRunner(processor); TestRunner runner = TestRunners.newTestRunner(processor)
TestElasticSearchClientService service = new TestElasticSearchClientService(false); TestElasticsearchClientService service = new TestElasticsearchClientService(false)
runner.addControllerService("esService", service); runner.addControllerService("esService", service)
runner.enableControllerService(service); runner.enableControllerService(service)
runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService"); runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME); runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
runner.setProperty(JsonQueryElasticsearch.TYPE, "message"); runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
runner.setValidateExpressionUsage(true); runner.setValidateExpressionUsage(true)
runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}"); runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
runner.enqueue("test"); runner.enqueue("test")
runner.run(1, true, true); runner.run(1, true, true)
testCounts(runner, 1, 1, 0, 0); testCounts(runner, 1, 1, 0, 0)
runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES); runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_HITS, JsonQueryElasticsearch.SPLIT_UP_YES)
runner.clearProvenanceEvents(); runner.clearProvenanceEvents()
runner.clearTransferState(); runner.clearTransferState()
runner.enqueue("test"); runner.enqueue("test")
runner.run(1, true, true); runner.run(1, true, true)
testCounts(runner, 1, 10, 0, 0); testCounts(runner, 1, 10, 0, 0)
} }
@Test @Test
public void testAggregations() throws Exception { void testAggregations() throws Exception {
String query = "{\n" + String query = "{\n" +
"\t\"query\": {\n" + "\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" + "\t\t\"match_all\": {}\n" +
@ -79,42 +79,42 @@ public class JsonQueryElasticsearchTest {
"\t\t\t}\n" + "\t\t\t}\n" +
"\t\t}\n" + "\t\t}\n" +
"\t}\n" + "\t}\n" +
"}"; "}"
JsonQueryElasticsearch processor = new JsonQueryElasticsearch(); JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
TestRunner runner = TestRunners.newTestRunner(processor); TestRunner runner = TestRunners.newTestRunner(processor)
TestElasticSearchClientService service = new TestElasticSearchClientService(true); TestElasticsearchClientService service = new TestElasticsearchClientService(true)
runner.addControllerService("esService", service); runner.addControllerService("esService", service)
runner.enableControllerService(service); runner.enableControllerService(service)
runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService"); runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME); runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
runner.setProperty(JsonQueryElasticsearch.TYPE, "message"); runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
runner.setValidateExpressionUsage(true); runner.setValidateExpressionUsage(true)
runner.setProperty(JsonQueryElasticsearch.QUERY, query); runner.setProperty(JsonQueryElasticsearch.QUERY, query)
runner.enqueue("test"); runner.enqueue("test")
runner.run(1, true, true); runner.run(1, true, true)
testCounts(runner, 1, 1, 0, 1); testCounts(runner, 1, 1, 0, 1)
runner.clearTransferState(); runner.clearTransferState()
//Test with the query parameter and no incoming connection //Test with the query parameter and no incoming connection
runner.setIncomingConnection(false); runner.setIncomingConnection(false)
runner.run(1, true, true); runner.run(1, true, true)
testCounts(runner, 0, 1, 0, 1); testCounts(runner, 0, 1, 0, 1)
runner.setIncomingConnection(true); runner.setIncomingConnection(true)
runner.clearTransferState(); runner.clearTransferState()
runner.clearProvenanceEvents(); runner.clearProvenanceEvents()
runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES); runner.setProperty(JsonQueryElasticsearch.SPLIT_UP_AGGREGATIONS, JsonQueryElasticsearch.SPLIT_UP_YES)
runner.enqueue("test"); runner.enqueue("test")
runner.run(1, true, true); runner.run(1, true, true)
testCounts(runner, 1, 1, 0, 2); testCounts(runner, 1, 1, 0, 2)
runner.clearProvenanceEvents(); runner.clearProvenanceEvents()
runner.clearTransferState(); runner.clearTransferState()
query = "{\n" + query = "{\n" +
"\t\"query\": {\n" + "\t\"query\": {\n" +
@ -123,30 +123,30 @@ public class JsonQueryElasticsearchTest {
"\t\"aggs\": {\n" + "\t\"aggs\": {\n" +
"\t\t\"test_agg\": {\n" + "\t\t\"test_agg\": {\n" +
"\t\t\t\"terms\": {\n" + "\t\t\t\"terms\": {\n" +
"\t\t\t\t\"field\": \"${fieldValue}\"\n" + "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
"\t\t\t}\n" + "\t\t\t}\n" +
"\t\t},\n" + "\t\t},\n" +
"\t\t\"test_agg2\": {\n" + "\t\t\"test_agg2\": {\n" +
"\t\t\t\"terms\": {\n" + "\t\t\t\"terms\": {\n" +
"\t\t\t\t\"field\": \"${fieldValue}\"\n" + "\t\t\t\t\"field\": \"\${fieldValue}\"\n" +
"\t\t\t}\n" + "\t\t\t}\n" +
"\t\t}\n" + "\t\t}\n" +
"\t}\n" + "\t}\n" +
"}"; "}"
runner.setVariable("fieldValue", "msg"); runner.setVariable("fieldValue", "msg")
runner.setVariable("es.index", INDEX_NAME); runner.setVariable("es.index", INDEX_NAME)
runner.setVariable("es.type", "msg"); runner.setVariable("es.type", "msg")
runner.setProperty(JsonQueryElasticsearch.QUERY, query); runner.setProperty(JsonQueryElasticsearch.QUERY, query)
runner.setProperty(JsonQueryElasticsearch.INDEX, "${es.index}"); runner.setProperty(JsonQueryElasticsearch.INDEX, "\${es.index}")
runner.setProperty(JsonQueryElasticsearch.TYPE, "${es.type}"); runner.setProperty(JsonQueryElasticsearch.TYPE, "\${es.type}")
runner.setValidateExpressionUsage(true); runner.setValidateExpressionUsage(true)
runner.enqueue("test"); runner.enqueue("test")
runner.run(1, true, true); runner.run(1, true, true)
testCounts(runner, 1, 1, 0, 2); testCounts(runner, 1, 1, 0, 2)
} }
@Test @Test
public void testErrorDuringSearch() throws Exception { void testErrorDuringSearch() throws Exception {
String query = "{\n" + String query = "{\n" +
"\t\"query\": {\n" + "\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" + "\t\t\"match_all\": {}\n" +
@ -163,28 +163,28 @@ public class JsonQueryElasticsearchTest {
"\t\t\t}\n" + "\t\t\t}\n" +
"\t\t}\n" + "\t\t}\n" +
"\t}\n" + "\t}\n" +
"}"; "}"
JsonQueryElasticsearch processor = new JsonQueryElasticsearch(); JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
TestRunner runner = TestRunners.newTestRunner(processor); TestRunner runner = TestRunners.newTestRunner(processor)
TestElasticSearchClientService service = new TestElasticSearchClientService(true); TestElasticsearchClientService service = new TestElasticsearchClientService(true)
service.setThrowErrorInSearch(true); service.setThrowErrorInSearch(true)
runner.addControllerService("esService", service); runner.addControllerService("esService", service)
runner.enableControllerService(service); runner.enableControllerService(service)
runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService"); runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME); runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
runner.setProperty(JsonQueryElasticsearch.TYPE, "message"); runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
runner.setValidateExpressionUsage(true); runner.setValidateExpressionUsage(true)
runner.setProperty(JsonQueryElasticsearch.QUERY, query); runner.setProperty(JsonQueryElasticsearch.QUERY, query)
runner.enqueue("test"); runner.enqueue("test")
runner.run(1, true, true); runner.run(1, true, true)
testCounts(runner, 0, 0, 1, 0); testCounts(runner, 0, 0, 1, 0)
} }
@Test @Test
public void testQueryAttribute() throws Exception { void testQueryAttribute() throws Exception {
final String query = "{\n" + final String query = "{\n" +
"\t\"query\": {\n" + "\t\"query\": {\n" +
"\t\t\"match_all\": {}\n" + "\t\t\"match_all\": {}\n" +
@ -201,32 +201,52 @@ public class JsonQueryElasticsearchTest {
"\t\t\t}\n" + "\t\t\t}\n" +
"\t\t}\n" + "\t\t}\n" +
"\t}\n" + "\t}\n" +
"}"; "}"
final String queryAttr = "es.query"; final String queryAttr = "es.query"
JsonQueryElasticsearch processor = new JsonQueryElasticsearch(); JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
TestRunner runner = TestRunners.newTestRunner(processor); TestRunner runner = TestRunners.newTestRunner(processor)
TestElasticSearchClientService service = new TestElasticSearchClientService(true); TestElasticsearchClientService service = new TestElasticsearchClientService(true)
runner.addControllerService("esService", service); runner.addControllerService("esService", service)
runner.enableControllerService(service); runner.enableControllerService(service)
runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService"); runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME); runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
runner.setProperty(JsonQueryElasticsearch.TYPE, "message"); runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
runner.setValidateExpressionUsage(true); runner.setValidateExpressionUsage(true)
runner.setProperty(JsonQueryElasticsearch.QUERY, query); runner.setProperty(JsonQueryElasticsearch.QUERY, query)
runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr); runner.setProperty(JsonQueryElasticsearch.QUERY_ATTRIBUTE, queryAttr)
runner.enqueue("test"); runner.enqueue("test")
runner.run(1, true, true); runner.run(1, true, true)
testCounts(runner, 1, 1, 0, 1); testCounts(runner, 1, 1, 0, 1)
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS); List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_AGGREGATIONS)
flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS)); flowFiles.addAll(runner.getFlowFilesForRelationship(JsonQueryElasticsearch.REL_HITS))
for (MockFlowFile mockFlowFile : flowFiles) { for (MockFlowFile mockFlowFile : flowFiles) {
String attr = mockFlowFile.getAttribute(queryAttr); String attr = mockFlowFile.getAttribute(queryAttr)
Assert.assertNotNull("Missing query attribute", attr); Assert.assertNotNull("Missing query attribute", attr)
Assert.assertEquals("Query had wrong value.", query, attr); Assert.assertEquals("Query had wrong value.", query, attr)
} }
} }
@Test
void testInputHandling() {
JsonQueryElasticsearch processor = new JsonQueryElasticsearch()
TestRunner runner = TestRunners.newTestRunner(processor)
TestElasticsearchClientService service = new TestElasticsearchClientService(false)
runner.addControllerService("esService", service)
runner.enableControllerService(service)
runner.setProperty(JsonQueryElasticsearch.CLIENT_SERVICE, "esService")
runner.setProperty(JsonQueryElasticsearch.INDEX, INDEX_NAME)
runner.setProperty(JsonQueryElasticsearch.TYPE, "message")
runner.setValidateExpressionUsage(true)
runner.setProperty(JsonQueryElasticsearch.QUERY, "{ \"query\": { \"match_all\": {} }}")
runner.run()
runner.assertTransferCount(JsonQueryElasticsearch.REL_SUCCESS, 0)
runner.assertTransferCount(JsonQueryElasticsearch.REL_FAILURE, 0)
runner.assertTransferCount(JsonQueryElasticsearch.REL_RETRY, 0)
}
} }

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch
import org.apache.avro.Schema
import org.apache.nifi.avro.AvroTypeUtil
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.json.JsonRecordSetWriter
import org.apache.nifi.json.JsonTreeReader
import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.serialization.record.MockSchemaRegistry
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import static groovy.json.JsonOutput.prettyPrint
import static groovy.json.JsonOutput.toJson
class PutElasticsearchRecordTest {
MockBulkLoadClientService clientService
MockSchemaRegistry registry
JsonTreeReader reader
TestRunner runner
static final String SCHEMA = prettyPrint(toJson([
name: "TestSchema",
type: "record",
fields: [
[ name: "msg", type: "string" ],
[ name: "from", type: "string" ]
]
]))
static final String flowFileContents = prettyPrint(toJson([
[ msg: "Hello, world", from: "john.smith" ],
[ msg: "Hi, back at ya!", from: "jane.doe" ]
]))
@Before
void setup() {
clientService = new MockBulkLoadClientService()
registry = new MockSchemaRegistry()
reader = new JsonTreeReader()
runner = TestRunners.newTestRunner(PutElasticsearchRecord.class)
registry.addSchema("simple", AvroTypeUtil.createSchema(new Schema.Parser().parse(SCHEMA)))
clientService.response = new IndexOperationResponse(1500)
runner.addControllerService("registry", registry)
runner.addControllerService("reader", reader)
runner.addControllerService("clientService", clientService)
runner.setProperty(reader, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
runner.setProperty(PutElasticsearchRecord.RECORD_READER, "reader")
runner.setProperty(PutElasticsearchRecord.INDEX, "test_index")
runner.setProperty(PutElasticsearchRecord.TYPE, "test_type")
runner.setProperty(PutElasticsearchRecord.CLIENT_SERVICE, "clientService")
runner.enableControllerService(registry)
runner.enableControllerService(reader)
runner.enableControllerService(clientService)
runner.assertValid()
}
void basicTest(int failure, int retry, int success) {
runner.enqueue(flowFileContents, [ "schema.name": "simple" ])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, failure)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, retry)
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
}
@Test
void simpleTest() {
basicTest(0, 0, 1)
}
@Test
void testFatalError() {
clientService.throwFatalError = true
basicTest(1, 0, 0)
}
@Test
void testRetriable() {
clientService.throwRetriableError = true
basicTest(0, 1, 0)
}
@Test
void testRecordPathFeatures() {
def newSchema = prettyPrint(toJson([
type: "record",
name: "RecordPathTestType",
fields: [
[ name: "id", type: "string" ],
[ name: "index", type: "string" ],
[ name: "type", type: "string" ],
[ name: "msg", type: "string"]
]
]))
def flowFileContents = prettyPrint(toJson([
[ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
]))
def evalClosure = { List<IndexOperationRequest> items ->
def a = items.findAll { it.index == "bulk_a" }.size()
def b = items.findAll { it.index == "bulk_b" }.size()
items.each {
Assert.assertNotNull(it.id)
Assert.assertTrue(it.id.startsWith("rec-"))
Assert.assertEquals("message", it.type)
}
Assert.assertEquals(3, a)
Assert.assertEquals(3, b)
}
clientService.evalClosure = evalClosure
registry.addSchema("recordPathTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
runner.setProperty(PutElasticsearchRecord.ID_RECORD_PATH, "/id")
runner.setProperty(PutElasticsearchRecord.INDEX_RECORD_PATH, "/index")
runner.setProperty(PutElasticsearchRecord.TYPE_RECORD_PATH, "/type")
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
runner.clearTransferState()
flowFileContents = prettyPrint(toJson([
[ id: "rec-1", index: null, type: null, msg: "Hello" ],
[ id: "rec-2", index: null, type: null, msg: "Hello" ],
[ id: "rec-3", index: null, type: null, msg: "Hello" ],
[ id: "rec-4", index: null, type: null, msg: "Hello" ],
[ id: "rec-5", index: null, type: null, msg: "Hello" ],
[ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
]))
evalClosure = { List<IndexOperationRequest> items ->
def testTypeCount = items.findAll { it.type == "test_type" }.size()
def messageTypeCount = items.findAll { it.type == "message" }.size()
def testIndexCount = items.findAll { it.index == "test_index" }.size()
def bulkIndexCount = items.findAll { it.index.startsWith("bulk_") }.size()
def indexOperationCount = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
Assert.assertEquals(5, testTypeCount)
Assert.assertEquals(1, messageTypeCount)
Assert.assertEquals(5, testIndexCount)
Assert.assertEquals(1, bulkIndexCount)
Assert.assertEquals(6, indexOperationCount)
}
clientService.evalClosure = evalClosure
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
runner.run()
runner.clearTransferState()
flowFileContents = prettyPrint(toJson([
[ id: "rec-1", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-2", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-3", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-4", index: "bulk_b", type: "message", msg: "Hello" ],
[ id: "rec-5", index: "bulk_a", type: "message", msg: "Hello" ],
[ id: "rec-6", index: "bulk_b", type: "message", msg: "Hello" ]
]))
clientService.evalClosure = { List<IndexOperationRequest> items ->
int index = items.findAll { it.operation == IndexOperationRequest.Operation.Index }.size()
Assert.assertEquals(6, index)
}
runner.enqueue(flowFileContents, [
"schema.name": "recordPathTest"
])
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILURE, 0)
runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
}
@Test
void testInputRequired() {
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 0)
}
@Test
void testErrorRelationship() {
def writer = new JsonRecordSetWriter()
runner.addControllerService("writer", writer)
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_NAME_PROPERTY)
runner.setProperty(writer, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
runner.enableControllerService(writer)
runner.setProperty(PutElasticsearchRecord.ERROR_RECORD_WRITER, "writer")
def newSchema = prettyPrint(toJson([
type: "record",
name: "RecordPathTestType",
fields: [
[ name: "id", type: "string" ],
[ name: "field1", type: ["null", "string"]],
[ name: "field2", type: "string"]
]
]))
def values = [
[ id: "1", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
[ id: "2", field1: 'value1', field2: '20' ],
[ id: "3", field1: 'value1', field2: '20abcd' ]
]
clientService.response = IndexOperationResponse.fromJsonResponse(MockBulkLoadClientService.SAMPLE_ERROR_RESPONSE)
registry.addSchema("errorTest", AvroTypeUtil.createSchema(new Schema.Parser().parse(newSchema)))
runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 'errorTest' ])
runner.setProperty(PutElasticsearchRecord.LOG_ERROR_RESPONSES, "true")
runner.assertValid()
runner.run()
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, 1)
runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 1)
def errorFF = runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
assert errorFF.getAttribute(PutElasticsearchRecord.ATTR_RECORD_COUNT) == "1"
}
}

View File

@ -15,82 +15,79 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.nifi.processors.elasticsearch; package org.apache.nifi.processors.elasticsearch
import com.fasterxml.jackson.databind.ObjectMapper; import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.elasticsearch.DeleteOperationResponse; import org.apache.nifi.elasticsearch.DeleteOperationResponse
import org.apache.nifi.elasticsearch.ElasticSearchClientService; import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.IndexOperationRequest; import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse; import org.apache.nifi.elasticsearch.IndexOperationResponse
import org.apache.nifi.elasticsearch.SearchResponse; import org.apache.nifi.elasticsearch.SearchResponse
import java.io.IOException; class TestElasticsearchClientService extends AbstractControllerService implements ElasticSearchClientService {
import java.util.Arrays; private boolean returnAggs
import java.util.HashMap; private boolean throwErrorInSearch
import java.util.List; private boolean throwErrorInDelete
import java.util.Map;
public class TestElasticSearchClientService extends AbstractControllerService implements ElasticSearchClientService { TestElasticsearchClientService(boolean returnAggs) {
private boolean returnAggs; this.returnAggs = returnAggs
private boolean throwErrorInSearch;
private boolean throwErrorInDelete;
public TestElasticSearchClientService(boolean returnAggs) {
this.returnAggs = returnAggs;
} }
@Override @Override
public IndexOperationResponse add(IndexOperationRequest operation) throws IOException { IndexOperationResponse add(IndexOperationRequest operation) {
return add(Arrays.asList(operation)); return add(Arrays.asList(operation))
} }
@Override @Override
public IndexOperationResponse add(List<IndexOperationRequest> operations) throws IOException { IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
return new IndexOperationResponse(100L, 100L); return new IndexOperationResponse(100L, 100L)
} }
@Override @Override
public DeleteOperationResponse deleteById(String index, String type, String id) throws IOException { Long count(String query, String index, String type) {
return deleteById(index, type, Arrays.asList(id)); return null
} }
@Override @Override
public DeleteOperationResponse deleteById(String index, String type, List<String> ids) throws IOException { DeleteOperationResponse deleteById(String index, String type, String id) {
return deleteById(index, type, Arrays.asList(id))
}
@Override
DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
if (throwErrorInDelete) { if (throwErrorInDelete) {
throw new IOException("Simulated IOException"); throw new IOException("Simulated IOException")
} }
return new DeleteOperationResponse(100L); return new DeleteOperationResponse(100L)
} }
@Override @Override
public DeleteOperationResponse deleteByQuery(String query, String index, String type) throws IOException { DeleteOperationResponse deleteByQuery(String query, String index, String type) {
return deleteById(index, type, Arrays.asList("1")); return deleteById(index, type, Arrays.asList("1"))
} }
@Override @Override
public Map<String, Object> get(String index, String type, String id) throws IOException { Map<String, Object> get(String index, String type, String id) {
return new HashMap<String, Object>(){{ return [ "msg": "one" ]
put("msg", "one");
}};
} }
@Override @Override
public SearchResponse search(String query, String index, String type) throws IOException { SearchResponse search(String query, String index, String type) {
if (throwErrorInSearch) { if (throwErrorInSearch) {
throw new IOException("Simulated IOException"); throw new IOException("Simulated IOException")
} }
ObjectMapper mapper = new ObjectMapper(); def mapper = new JsonSlurper()
List<Map<String, Object>> hits = (List<Map<String, Object>>)mapper.readValue(HITS_RESULT, List.class); def hits = mapper.parseText(HITS_RESULT)
Map<String, Object> aggs = returnAggs ? (Map<String, Object>)mapper.readValue(AGGS_RESULT, Map.class) : null; def aggs = returnAggs ? mapper.parseText(AGGS_RESULT) : null
SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false); SearchResponse response = new SearchResponse(hits, aggs, 15, 5, false)
return response; return response
} }
@Override @Override
public String getTransitUrl(String index, String type) { String getTransitUrl(String index, String type) {
return String.format("http://localhost:9400/%s/%s", index, type); "http://localhost:9400/${index}/${type}"
} }
private static final String AGGS_RESULT = "{\n" + private static final String AGGS_RESULT = "{\n" +
@ -146,7 +143,7 @@ public class TestElasticSearchClientService extends AbstractControllerService im
" }\n" + " }\n" +
" ]\n" + " ]\n" +
" }\n" + " }\n" +
" }"; " }"
private static final String HITS_RESULT = "[\n" + private static final String HITS_RESULT = "[\n" +
" {\n" + " {\n" +
@ -239,13 +236,13 @@ public class TestElasticSearchClientService extends AbstractControllerService im
" \"msg\": \"five\"\n" + " \"msg\": \"five\"\n" +
" }\n" + " }\n" +
" }\n" + " }\n" +
" ]"; " ]"
public void setThrowErrorInSearch(boolean throwErrorInSearch) { void setThrowErrorInSearch(boolean throwErrorInSearch) {
this.throwErrorInSearch = throwErrorInSearch; this.throwErrorInSearch = throwErrorInSearch
} }
public void setThrowErrorInDelete(boolean throwErrorInDelete) { void setThrowErrorInDelete(boolean throwErrorInDelete) {
this.throwErrorInDelete = throwErrorInDelete; this.throwErrorInDelete = throwErrorInDelete
} }
} }

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.mock
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.elasticsearch.*
class AbstractMockElasticsearchClient extends AbstractControllerService implements ElasticSearchClientService {
boolean throwRetriableError
boolean throwFatalError
@Override
IndexOperationResponse add(IndexOperationRequest operation) {
return null
}
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> operations) {
return null
}
@Override
Long count(String query, String index, String type) {
return null
}
@Override
DeleteOperationResponse deleteById(String index, String type, String id) {
return null
}
@Override
DeleteOperationResponse deleteById(String index, String type, List<String> ids) {
return null
}
@Override
DeleteOperationResponse deleteByQuery(String query, String index, String type) {
return null
}
@Override
Map<String, Object> get(String index, String type, String id) {
return null
}
@Override
SearchResponse search(String query, String index, String type) {
return null
}
@Override
String getTransitUrl(String index, String type) {
return null
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.mock
import org.apache.nifi.elasticsearch.IndexOperationRequest
import org.apache.nifi.elasticsearch.IndexOperationResponse
class MockBulkLoadClientService extends AbstractMockElasticsearchClient {
IndexOperationResponse response
Closure evalClosure
@Override
IndexOperationResponse bulk(List<IndexOperationRequest> items) {
if (throwRetriableError) {
throw new MockElasticsearchError(true)
} else if (throwFatalError) {
throw new MockElasticsearchError(false)
}
if (evalClosure) {
evalClosure.call(items)
}
response
}
static final SAMPLE_ERROR_RESPONSE = """
{
"took" : 18,
"errors" : true,
"items" : [
{
"index" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 4,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 4,
"_primary_term" : 1,
"status" : 200
}
},
{
"create" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "2",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 1,
"_primary_term" : 1,
"status" : 201
}
},
{
"create" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "3",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 3,
"_primary_term" : 1,
"status" : 201
}
},
{
"index" : {
"_index" : "test",
"_type" : "_doc",
"_id" : "4",
"status" : 400,
"error" : {
"type" : "mapper_parsing_exception",
"reason" : "failed to parse field [field2] of type [integer] in document with id '4'",
"caused_by" : {
"type" : "number_format_exception",
"reason" : "For input string: 20abc"
}
}
}
}
]
}
"""
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch.mock
import org.apache.nifi.elasticsearch.ElasticsearchError
class MockElasticsearchError extends ElasticsearchError {
MockElasticsearchError(boolean isElastic) {
this(new Exception())
this.isElastic = isElastic
}
MockElasticsearchError(Exception ex) {
super(ex)
}
}

View File

@ -0,0 +1 @@
# Placeholder to force compilation

View File

@ -1,134 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DeleteByQueryElasticsearchTest {
private static final String INDEX = "test_idx";
private static final String TYPE = "test_type";
private static final String QUERY_ATTR = "es.delete.query";
private static final String CLIENT_NAME = "clientService";
private TestElasticSearchClientService client;
private void initClient(TestRunner runner) throws Exception {
client = new TestElasticSearchClientService(true);
runner.addControllerService(CLIENT_NAME, client);
runner.enableControllerService(client);
runner.setProperty(DeleteByQueryElasticsearch.CLIENT_SERVICE, CLIENT_NAME);
}
private void postTest(TestRunner runner, String queryParam) {
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 0);
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_SUCCESS);
String attr = flowFiles.get(0).getAttribute(DeleteByQueryElasticsearch.TOOK_ATTRIBUTE);
String query = flowFiles.get(0).getAttribute(QUERY_ATTR);
Assert.assertNotNull(attr);
Assert.assertEquals(attr, "100");
Assert.assertNotNull(query);
Assert.assertEquals(queryParam, query);
}
@Test
public void testWithFlowfileInput() throws Exception {
String query = "{ \"query\": { \"match_all\": {} }}";
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
initClient(runner);
runner.assertValid();
runner.enqueue(query);
runner.run();
postTest(runner, query);
}
@Test
public void testWithQuery() throws Exception {
String query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"${field.name}.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}";
Map<String, String> attrs = new HashMap<String, String>(){{
put("field.name", "test_field");
}};
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
initClient(runner);
runner.assertValid();
runner.enqueue("", attrs);
runner.run();
postTest(runner, query.replace("${field.name}", "test_field"));
runner.clearTransferState();
query = "{\n" +
"\t\"query\": {\n" +
"\t\t\"match\": {\n" +
"\t\t\t\"test_field.keyword\": \"test\"\n" +
"\t\t}\n" +
"\t}\n" +
"}";
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
runner.setIncomingConnection(false);
runner.assertValid();
runner.run();
postTest(runner, query);
}
@Test
public void testErrorAttribute() throws Exception {
String query = "{ \"query\": { \"match_all\": {} }}";
TestRunner runner = TestRunners.newTestRunner(DeleteByQueryElasticsearch.class);
runner.setProperty(DeleteByQueryElasticsearch.QUERY, query);
runner.setProperty(DeleteByQueryElasticsearch.INDEX, INDEX);
runner.setProperty(DeleteByQueryElasticsearch.TYPE, TYPE);
runner.setProperty(DeleteByQueryElasticsearch.QUERY_ATTRIBUTE, QUERY_ATTR);
initClient(runner);
client.setThrowErrorInDelete(true);
runner.assertValid();
runner.enqueue("");
runner.run();
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_SUCCESS, 0);
runner.assertTransferCount(DeleteByQueryElasticsearch.REL_FAILURE, 1);
MockFlowFile mockFlowFile = runner.getFlowFilesForRelationship(DeleteByQueryElasticsearch.REL_FAILURE).get(0);
String attr = mockFlowFile.getAttribute(DeleteByQueryElasticsearch.ERROR_ATTRIBUTE);
Assert.assertNotNull(attr);
}
}