NIFI-4250 - Elasticsearch 5 delete processor

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #2045
This commit is contained in:
mans2singh 2017-08-01 06:07:38 -07:00 committed by Matt Burgess
parent 41984bed13
commit 8cb501443b
6 changed files with 752 additions and 0 deletions

View File

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@EventDriven
@Tags({"elasticsearch", "elasticsearch 5", "delete", "remove"})
@CapabilityDescription("Delete a document from Elasticsearch 5.0 by document id. If the cluster has been configured for authorization and/or secure "
+ "transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made.")
@WritesAttributes({
@WritesAttribute(attribute = DeleteElasticsearch5.ES_ERROR_MESSAGE, description = "The message attribute in case of error"),
@WritesAttribute(attribute = DeleteElasticsearch5.ES_FILENAME, description = "The filename attribute which is set to the document identifier"),
@WritesAttribute(attribute = DeleteElasticsearch5.ES_INDEX, description = "The Elasticsearch index containing the document"),
@WritesAttribute(attribute = DeleteElasticsearch5.ES_TYPE, description = "The Elasticsearch document type"),
@WritesAttribute(attribute = DeleteElasticsearch5.ES_REST_STATUS, description = "The filename attribute with rest status")
})
@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class})
public class DeleteElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
public static final String UNABLE_TO_DELETE_DOCUMENT_MESSAGE = "Unable to delete document";
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("All FlowFile corresponding to the deleted document from Elasticsearch are routed to this relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("All FlowFile corresponding to delete document that failed from Elasticsearch are routed to this relationship").build();
public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
.description("A FlowFile is routed to this relationship if the document cannot be deleted because or retryable exception like timeout or node not available")
.build();
public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found")
.description("A FlowFile is routed to this relationship if the specified document was not found in elasticsearch")
.build();
public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder()
.name("el5-delete-document-id")
.displayName("Document Identifier")
.description("The identifier for the document to be deleted")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder()
.name("el5-delete-index")
.displayName("Index")
.description("The name of the index to delete the document from")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder()
.name("el5-delete-type")
.displayName("Type")
.description("The type of this document to be deleted")
.required(true)
.expressionLanguageSupported(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
public static final String ES_ERROR_MESSAGE = "es.error.message";
public static final String ES_FILENAME = "filename";
public static final String ES_INDEX = "es.index";
public static final String ES_TYPE = "es.type";
public static final String ES_REST_STATUS = "es.rest.status";
static {
final Set<Relationship> relations = new HashSet<>();
relations.add(REL_SUCCESS);
relations.add(REL_FAILURE);
relations.add(REL_RETRY);
relations.add(REL_NOT_FOUND);
relationships = Collections.unmodifiableSet(relations);
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(CLUSTER_NAME);
descriptors.add(HOSTS);
descriptors.add(PROP_SSL_CONTEXT_SERVICE);
descriptors.add(PROP_XPACK_LOCATION);
descriptors.add(USERNAME);
descriptors.add(PASSWORD);
descriptors.add(PING_TIMEOUT);
descriptors.add(SAMPLER_INTERVAL);
descriptors.add(DOCUMENT_ID);
descriptors.add(INDEX);
descriptors.add(TYPE);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return propertyDescriptors;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
synchronized (esClient) {
if(esClient.get() == null) {
setup(context);
}
}
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue();
final String documentType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
final ComponentLog logger = getLogger();
if ( StringUtils.isBlank(index) ) {
logger.debug("Index is required but was empty {}", new Object [] { index });
flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Index is required but was empty");
session.transfer(flowFile,REL_FAILURE);
return;
}
if ( StringUtils.isBlank(documentType) ) {
logger.debug("Document type is required but was empty {}", new Object [] { documentType });
flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document type is required but was empty");
session.transfer(flowFile,REL_FAILURE);
return;
}
if ( StringUtils.isBlank(documentId) ) {
logger.debug("Document id is required but was empty {}", new Object [] { documentId });
flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document id is required but was empty");
session.transfer(flowFile,REL_FAILURE);
return;
}
flowFile = session.putAllAttributes(flowFile, new HashMap<String, String>() {{
put(ES_FILENAME, documentId);
put(ES_INDEX, index);
put(ES_TYPE, documentType);
}});
try {
logger.debug("Deleting document {}/{}/{} from Elasticsearch", new Object[]{index, documentType, documentId});
DeleteRequestBuilder requestBuilder = prepareDeleteRequest(index, documentId, documentType);
final DeleteResponse response = doDelete(requestBuilder);
if (response.status() != RestStatus.OK) {
logger.warn("Failed to delete document {}/{}/{} from Elasticsearch: Status {}",
new Object[]{index, documentType, documentId, response.status()});
flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, UNABLE_TO_DELETE_DOCUMENT_MESSAGE);
flowFile = session.putAttribute(flowFile, ES_REST_STATUS, response.status().toString());
context.yield();
if ( response.status() == RestStatus.NOT_FOUND ) {
session.transfer(flowFile, REL_NOT_FOUND);
} else {
session.transfer(flowFile, REL_FAILURE);
}
} else {
logger.debug("Elasticsearch document " + documentId + " deleted");
session.transfer(flowFile, REL_SUCCESS);
}
} catch ( ElasticsearchTimeoutException
| ReceiveTimeoutTransportException exception) {
logger.error("Failed to delete document {} from Elasticsearch due to {}",
new Object[]{documentId, exception.getLocalizedMessage()}, exception);
flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, exception.getLocalizedMessage());
session.transfer(flowFile, REL_RETRY);
context.yield();
} catch (Exception e) {
logger.error("Failed to delete document {} from Elasticsearch due to {}", new Object[]{documentId, e.getLocalizedMessage()}, e);
flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, e.getLocalizedMessage());
session.transfer(flowFile, REL_FAILURE);
context.yield();
}
}
protected DeleteRequestBuilder prepareDeleteRequest(final String index, final String documentId, final String documentType) {
return esClient.get().prepareDelete(index, documentType, documentId);
}
protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
throws InterruptedException, ExecutionException {
return requestBuilder.execute().get();
}
@OnStopped
public void closeClient() {
super.closeClient();
}
}

View File

@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@ -65,6 +66,7 @@ import java.util.Set;
@WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"),
@WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type")
})
@SeeAlso({DeleteElasticsearch5.class,PutElasticsearch5.class})
public class FetchElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")

View File

@ -21,6 +21,7 @@ import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
@ -62,6 +63,7 @@ import java.util.Set;
+ "the index to insert into and the type of the document. If the cluster has been configured for authorization "
+ "and/or secure transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made. This processor "
+ "supports Elasticsearch 5.x clusters.")
@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class})
public class PutElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor {
private static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> {

View File

@ -14,3 +14,4 @@
# limitations under the License.
org.apache.nifi.processors.elasticsearch.FetchElasticsearch5
org.apache.nifi.processors.elasticsearch.PutElasticsearch5
org.apache.nifi.processors.elasticsearch.DeleteElasticsearch5

View File

@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.rest.RestStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
/**
* Integration test for delete processor. Please set the hosts, cluster name, index and type etc before running the integrations.
*/
@Ignore("Comment this out for es delete integration testing and set the appropriate cluster name, hosts, etc")
public class ITDeleteElasticsearch5Test {
private static final String TYPE1 = "type1";
private static final String INDEX1 = "index1";
protected DeleteResponse deleteResponse;
protected RestStatus restStatus;
private InputStream inputDocument;
protected String clusterName = "elasticsearch";
private String documentId;
@Before
public void setUp() throws IOException {
ClassLoader classloader = Thread.currentThread().getContextClassLoader();
inputDocument = classloader.getResourceAsStream("DocumentExample.json");
long currentTimeMillis = System.currentTimeMillis();
documentId = String.valueOf(currentTimeMillis);
}
@After
public void teardown() {
}
@Test
public void testPutAndDeleteIntegrationTestSuccess() {
final TestRunner runnerPut = TestRunners.newTestRunner(new PutElasticsearch5());
runnerPut.setValidateExpressionUsage(false);
runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runnerPut.setProperty(PutElasticsearch5.INDEX, INDEX1);
runnerPut.setProperty(PutElasticsearch5.BATCH_SIZE, "1");
runnerPut.setProperty(PutElasticsearch5.TYPE, TYPE1);
runnerPut.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "id");
runnerPut.assertValid();
runnerPut.enqueue(inputDocument, new HashMap<String, String>() {{
put("id", documentId);
}});
runnerPut.enqueue(inputDocument);
runnerPut.run(1, true, true);
runnerPut.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
runnerDelete.setValidateExpressionUsage(false);
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
runnerDelete.assertValid();
runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runnerDelete.enqueue(new byte [] {});
runnerDelete.run(1, true, true);
runnerDelete.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1);
}
@Test
public void testDeleteIntegrationTestDocumentNotFound() {
final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
runnerDelete.setValidateExpressionUsage(false);
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
runnerDelete.assertValid();
runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runnerDelete.enqueue(new byte [] {});
runnerDelete.run(1, true, true);
runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
}
@Test
public void testDeleteIntegrationTestBadIndex() {
final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
runnerDelete.setValidateExpressionUsage(false);
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
String index = String.valueOf(System.currentTimeMillis());
runnerDelete.setProperty(DeleteElasticsearch5.INDEX, index);
runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
runnerDelete.assertValid();
runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runnerDelete.enqueue(new byte [] {});
runnerDelete.run(1, true, true);
runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, index);
out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
}
@Test
public void testDeleteIntegrationTestBadType() {
final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5());
runnerDelete.setValidateExpressionUsage(false);
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName);
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
String type = String.valueOf(System.currentTimeMillis());
runnerDelete.setProperty(DeleteElasticsearch5.TYPE, type);
runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
runnerDelete.assertValid();
runnerDelete.enqueue(new byte[] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runnerDelete.enqueue(new byte [] {});
runnerDelete.run(1, true, true);
runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, type);
}
}

View File

@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.rest.RestStatus;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestDeleteElasticsearch5 {
private String documentId;
private static final String TYPE1 = "type1";
private static final String INDEX1 = "index1";
private TestRunner runner;
protected DeleteResponse deleteResponse;
protected RestStatus restStatus;
private DeleteElasticsearch5 mockDeleteProcessor;
long currentTimeMillis;
@Before
public void setUp() throws IOException {
currentTimeMillis = System.currentTimeMillis();
documentId = String.valueOf(currentTimeMillis);
mockDeleteProcessor = new DeleteElasticsearch5() {
@Override
protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
return null;
}
@Override
protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
throws InterruptedException, ExecutionException {
return deleteResponse;
}
@Override
public void setup(ProcessContext context) {
}
};
runner = TestRunners.newTestRunner(mockDeleteProcessor);
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
runner.assertNotValid();
runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
runner.assertNotValid();
runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
runner.assertValid();
}
@After
public void teardown() {
runner = null;
}
@Test
public void testDeleteWithNoDocumentId() throws IOException {
runner.enqueue(new byte [] {});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
assertNotNull(out);
assertEquals("Document id is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
}
@Test
public void testDeleteWithNoIndex() throws IOException {
runner.setProperty(DeleteElasticsearch5.INDEX, "${index}");
runner.enqueue(new byte [] {});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
assertNotNull(out);
assertEquals("Index is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
}
@Test
public void testDeleteWithNoType() throws IOException {
runner.setProperty(DeleteElasticsearch5.TYPE, "${type}");
runner.enqueue(new byte [] {});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
assertNotNull(out);
assertEquals("Document type is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
}
@Test
public void testDeleteSuccessful() throws IOException {
restStatus = RestStatus.OK;
deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
@Override
public RestStatus status() {
return restStatus;
}
};
runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_SUCCESS, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_SUCCESS).get(0);
assertNotNull(out);
assertEquals(null,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
}
@Test
public void testDeleteNotFound() throws IOException {
restStatus = RestStatus.NOT_FOUND;
deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
@Override
public RestStatus status() {
return restStatus;
}
};
runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0);
assertNotNull(out);
assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString());
out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
}
@Test
public void testDeleteServerFailure() throws IOException {
restStatus = RestStatus.SERVICE_UNAVAILABLE;
deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) {
@Override
public RestStatus status() {
return restStatus;
}
};
runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
assertNotNull(out);
assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString());
out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
}
@Test
public void testDeleteRetryableException() throws IOException {
mockDeleteProcessor = new DeleteElasticsearch5() {
@Override
protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
return null;
}
@Override
protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
throws InterruptedException, ExecutionException {
throw new ElasticsearchTimeoutException("timeout");
}
@Override
public void setup(ProcessContext context) {
}
};
runner = TestRunners.newTestRunner(mockDeleteProcessor);
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
runner.assertValid();
runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_RETRY, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_RETRY).get(0);
assertNotNull(out);
assertEquals("timeout",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null);
out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
}
@Test
public void testDeleteNonRetryableException() throws IOException {
mockDeleteProcessor = new DeleteElasticsearch5() {
@Override
protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) {
return null;
}
@Override
protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder)
throws InterruptedException, ExecutionException {
throw new InterruptedException("exception");
}
@Override
public void setup(ProcessContext context) {
}
};
runner = TestRunners.newTestRunner(mockDeleteProcessor);
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s");
runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s");
runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1);
runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1);
runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}");
runner.assertValid();
runner.enqueue(new byte [] {}, new HashMap<String, String>() {{
put("documentId", documentId);
}});
runner.run(1, true, true);
runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0);
assertNotNull(out);
assertEquals("exception",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE));
out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null);
out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId);
out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1);
out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1);
}
}