diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java new file mode 100644 index 0000000000..9e77775246 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/DeleteElasticsearch5.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.delete.DeleteRequestBuilder; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@Tags({"elasticsearch", "elasticsearch 5", "delete", "remove"}) +@CapabilityDescription("Delete a document from Elasticsearch 5.0 by document id. If the cluster has been configured for authorization and/or secure " + + "transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made.") +@WritesAttributes({ + @WritesAttribute(attribute = DeleteElasticsearch5.ES_ERROR_MESSAGE, description = "The message attribute in case of error"), + @WritesAttribute(attribute = DeleteElasticsearch5.ES_FILENAME, description = "The filename attribute which is set to the document identifier"), + @WritesAttribute(attribute = DeleteElasticsearch5.ES_INDEX, description = "The Elasticsearch index containing the document"), + @WritesAttribute(attribute = DeleteElasticsearch5.ES_TYPE, description = "The Elasticsearch document type"), + @WritesAttribute(attribute = DeleteElasticsearch5.ES_REST_STATUS, description = "The filename attribute with rest status") +}) +@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class}) +public class DeleteElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor { + + public static final String UNABLE_TO_DELETE_DOCUMENT_MESSAGE = "Unable to delete document"; + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("All FlowFile corresponding to the deleted document from Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("All FlowFile corresponding to delete document that failed from Elasticsearch are routed to this relationship").build(); + + public static final Relationship REL_RETRY = new Relationship.Builder().name("retry") + .description("A FlowFile is routed to this relationship if the document cannot be deleted because or retryable exception like timeout or node not available") + .build(); + + public static final Relationship REL_NOT_FOUND = new Relationship.Builder().name("not found") + .description("A FlowFile is routed to this relationship if the specified document was not found in elasticsearch") + .build(); + + public static final PropertyDescriptor DOCUMENT_ID = new PropertyDescriptor.Builder() + .name("el5-delete-document-id") + .displayName("Document Identifier") + .description("The identifier for the document to be deleted") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder() + .name("el5-delete-index") + .displayName("Index") + .description("The name of the index to delete the document from") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder() + .name("el5-delete-type") + .displayName("Type") + .description("The type of this document to be deleted") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final Set relationships; + private static final List propertyDescriptors; + + public static final String ES_ERROR_MESSAGE = "es.error.message"; + public static final String ES_FILENAME = "filename"; + public static final String ES_INDEX = "es.index"; + public static final String ES_TYPE = "es.type"; + public static final String ES_REST_STATUS = "es.rest.status"; + + static { + final Set relations = new HashSet<>(); + relations.add(REL_SUCCESS); + relations.add(REL_FAILURE); + relations.add(REL_RETRY); + relations.add(REL_NOT_FOUND); + relationships = Collections.unmodifiableSet(relations); + + final List descriptors = new ArrayList<>(); + descriptors.add(CLUSTER_NAME); + descriptors.add(HOSTS); + descriptors.add(PROP_SSL_CONTEXT_SERVICE); + descriptors.add(PROP_XPACK_LOCATION); + descriptors.add(USERNAME); + descriptors.add(PASSWORD); + descriptors.add(PING_TIMEOUT); + descriptors.add(SAMPLER_INTERVAL); + descriptors.add(DOCUMENT_ID); + descriptors.add(INDEX); + descriptors.add(TYPE); + + propertyDescriptors = Collections.unmodifiableList(descriptors); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + public final List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + + synchronized (esClient) { + if(esClient.get() == null) { + setup(context); + } + } + + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String index = context.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue(); + final String documentId = context.getProperty(DOCUMENT_ID).evaluateAttributeExpressions(flowFile).getValue(); + final String documentType = context.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue(); + + final ComponentLog logger = getLogger(); + + if ( StringUtils.isBlank(index) ) { + logger.debug("Index is required but was empty {}", new Object [] { index }); + flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Index is required but was empty"); + session.transfer(flowFile,REL_FAILURE); + return; + } + if ( StringUtils.isBlank(documentType) ) { + logger.debug("Document type is required but was empty {}", new Object [] { documentType }); + flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document type is required but was empty"); + session.transfer(flowFile,REL_FAILURE); + return; + } + if ( StringUtils.isBlank(documentId) ) { + logger.debug("Document id is required but was empty {}", new Object [] { documentId }); + flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, "Document id is required but was empty"); + session.transfer(flowFile,REL_FAILURE); + return; + } + + flowFile = session.putAllAttributes(flowFile, new HashMap() {{ + put(ES_FILENAME, documentId); + put(ES_INDEX, index); + put(ES_TYPE, documentType); + }}); + + try { + + logger.debug("Deleting document {}/{}/{} from Elasticsearch", new Object[]{index, documentType, documentId}); + DeleteRequestBuilder requestBuilder = prepareDeleteRequest(index, documentId, documentType); + final DeleteResponse response = doDelete(requestBuilder); + + if (response.status() != RestStatus.OK) { + logger.warn("Failed to delete document {}/{}/{} from Elasticsearch: Status {}", + new Object[]{index, documentType, documentId, response.status()}); + flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, UNABLE_TO_DELETE_DOCUMENT_MESSAGE); + flowFile = session.putAttribute(flowFile, ES_REST_STATUS, response.status().toString()); + context.yield(); + if ( response.status() == RestStatus.NOT_FOUND ) { + session.transfer(flowFile, REL_NOT_FOUND); + } else { + session.transfer(flowFile, REL_FAILURE); + } + } else { + logger.debug("Elasticsearch document " + documentId + " deleted"); + session.transfer(flowFile, REL_SUCCESS); + } + } catch ( ElasticsearchTimeoutException + | ReceiveTimeoutTransportException exception) { + logger.error("Failed to delete document {} from Elasticsearch due to {}", + new Object[]{documentId, exception.getLocalizedMessage()}, exception); + flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, exception.getLocalizedMessage()); + session.transfer(flowFile, REL_RETRY); + context.yield(); + + } catch (Exception e) { + logger.error("Failed to delete document {} from Elasticsearch due to {}", new Object[]{documentId, e.getLocalizedMessage()}, e); + flowFile = session.putAttribute(flowFile, ES_ERROR_MESSAGE, e.getLocalizedMessage()); + session.transfer(flowFile, REL_FAILURE); + context.yield(); + } + } + + protected DeleteRequestBuilder prepareDeleteRequest(final String index, final String documentId, final String documentType) { + return esClient.get().prepareDelete(index, documentType, documentId); + } + + protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder) + throws InterruptedException, ExecutionException { + return requestBuilder.execute().get(); + } + + @OnStopped + public void closeClient() { + super.closeClient(); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java index dcae6153bb..ee8674e397 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/FetchElasticsearch5.java @@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; @@ -65,6 +66,7 @@ import java.util.Set; @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type") }) +@SeeAlso({DeleteElasticsearch5.class,PutElasticsearch5.class}) public class FetchElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java index ef70bf0ac3..ded35cf8a7 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch5.java @@ -21,6 +21,7 @@ import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; @@ -62,6 +63,7 @@ import java.util.Set; + "the index to insert into and the type of the document. If the cluster has been configured for authorization " + "and/or secure transport (SSL/TLS), and the X-Pack plugin is available, secure connections can be made. This processor " + "supports Elasticsearch 5.x clusters.") +@SeeAlso({FetchElasticsearch5.class,PutElasticsearch5.class}) public class PutElasticsearch5 extends AbstractElasticsearch5TransportClientProcessor { private static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> { diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 156f7c0c94..8db7223075 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -14,3 +14,4 @@ # limitations under the License. org.apache.nifi.processors.elasticsearch.FetchElasticsearch5 org.apache.nifi.processors.elasticsearch.PutElasticsearch5 +org.apache.nifi.processors.elasticsearch.DeleteElasticsearch5 diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java new file mode 100644 index 0000000000..f9dfe589b2 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/ITDeleteElasticsearch5Test.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.rest.RestStatus; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; + +/** + * Integration test for delete processor. Please set the hosts, cluster name, index and type etc before running the integrations. + */ +@Ignore("Comment this out for es delete integration testing and set the appropriate cluster name, hosts, etc") +public class ITDeleteElasticsearch5Test { + + private static final String TYPE1 = "type1"; + private static final String INDEX1 = "index1"; + protected DeleteResponse deleteResponse; + protected RestStatus restStatus; + private InputStream inputDocument; + protected String clusterName = "elasticsearch"; + private String documentId; + + @Before + public void setUp() throws IOException { + ClassLoader classloader = Thread.currentThread().getContextClassLoader(); + inputDocument = classloader.getResourceAsStream("DocumentExample.json"); + long currentTimeMillis = System.currentTimeMillis(); + documentId = String.valueOf(currentTimeMillis); + } + + @After + public void teardown() { + } + + @Test + public void testPutAndDeleteIntegrationTestSuccess() { + final TestRunner runnerPut = TestRunners.newTestRunner(new PutElasticsearch5()); + runnerPut.setValidateExpressionUsage(false); + runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName); + runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s"); + runnerPut.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s"); + + runnerPut.setProperty(PutElasticsearch5.INDEX, INDEX1); + runnerPut.setProperty(PutElasticsearch5.BATCH_SIZE, "1"); + + runnerPut.setProperty(PutElasticsearch5.TYPE, TYPE1); + runnerPut.setProperty(PutElasticsearch5.ID_ATTRIBUTE, "id"); + runnerPut.assertValid(); + + runnerPut.enqueue(inputDocument, new HashMap() {{ + put("id", documentId); + }}); + + runnerPut.enqueue(inputDocument); + runnerPut.run(1, true, true); + + runnerPut.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1); + + final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5()); + runnerDelete.setValidateExpressionUsage(false); + + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s"); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s"); + + runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1); + + runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1); + runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}"); + runnerDelete.assertValid(); + + runnerDelete.enqueue(new byte[] {}, new HashMap() {{ + put("documentId", documentId); + }}); + + runnerDelete.enqueue(new byte [] {}); + runnerDelete.run(1, true, true); + + runnerDelete.assertAllFlowFilesTransferred(PutElasticsearch5.REL_SUCCESS, 1); + } + + @Test + public void testDeleteIntegrationTestDocumentNotFound() { + final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5()); + runnerDelete.setValidateExpressionUsage(false); + + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s"); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s"); + + runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1); + + runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1); + runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}"); + runnerDelete.assertValid(); + + runnerDelete.enqueue(new byte[] {}, new HashMap() {{ + put("documentId", documentId); + }}); + + runnerDelete.enqueue(new byte [] {}); + runnerDelete.run(1, true, true); + + runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1); + final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0); + out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId); + out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1); + out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1); + } + + @Test + public void testDeleteIntegrationTestBadIndex() { + final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5()); + runnerDelete.setValidateExpressionUsage(false); + + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s"); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s"); + + String index = String.valueOf(System.currentTimeMillis()); + runnerDelete.setProperty(DeleteElasticsearch5.INDEX, index); + + runnerDelete.setProperty(DeleteElasticsearch5.TYPE, TYPE1); + runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}"); + runnerDelete.assertValid(); + + runnerDelete.enqueue(new byte[] {}, new HashMap() {{ + put("documentId", documentId); + }}); + + runnerDelete.enqueue(new byte [] {}); + runnerDelete.run(1, true, true); + runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1); + final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0); + out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId); + out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, index); + out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1); + } + + @Test + public void testDeleteIntegrationTestBadType() { + final TestRunner runnerDelete = TestRunners.newTestRunner(new DeleteElasticsearch5()); + runnerDelete.setValidateExpressionUsage(false); + + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, clusterName); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s"); + runnerDelete.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s"); + + runnerDelete.setProperty(DeleteElasticsearch5.INDEX, INDEX1); + String type = String.valueOf(System.currentTimeMillis()); + runnerDelete.setProperty(DeleteElasticsearch5.TYPE, type); + runnerDelete.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}"); + runnerDelete.assertValid(); + + runnerDelete.enqueue(new byte[] {}, new HashMap() {{ + put("documentId", documentId); + }}); + + runnerDelete.enqueue(new byte [] {}); + runnerDelete.run(1, true, true); + + runnerDelete.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1); + final MockFlowFile out = runnerDelete.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0); + out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId); + out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1); + out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, type); + } +} diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java new file mode 100644 index 0000000000..83e8a2dde8 --- /dev/null +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestDeleteElasticsearch5.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.elasticsearch; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.delete.DeleteRequestBuilder; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.rest.RestStatus; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestDeleteElasticsearch5 { + + private String documentId; + private static final String TYPE1 = "type1"; + private static final String INDEX1 = "index1"; + private TestRunner runner; + protected DeleteResponse deleteResponse; + protected RestStatus restStatus; + private DeleteElasticsearch5 mockDeleteProcessor; + long currentTimeMillis; + + @Before + public void setUp() throws IOException { + currentTimeMillis = System.currentTimeMillis(); + documentId = String.valueOf(currentTimeMillis); + mockDeleteProcessor = new DeleteElasticsearch5() { + + @Override + protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) { + return null; + } + + @Override + protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder) + throws InterruptedException, ExecutionException { + return deleteResponse; + } + + @Override + public void setup(ProcessContext context) { + } + + }; + + runner = TestRunners.newTestRunner(mockDeleteProcessor); + + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s"); + + runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1); + runner.assertNotValid(); + runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1); + runner.assertNotValid(); + runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}"); + runner.assertValid(); + } + + @After + public void teardown() { + runner = null; + } + + @Test + public void testDeleteWithNoDocumentId() throws IOException { + + runner.enqueue(new byte [] {}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0); + assertNotNull(out); + assertEquals("Document id is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE)); + } + + @Test + public void testDeleteWithNoIndex() throws IOException { + runner.setProperty(DeleteElasticsearch5.INDEX, "${index}"); + + runner.enqueue(new byte [] {}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0); + assertNotNull(out); + assertEquals("Index is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE)); + } + + @Test + public void testDeleteWithNoType() throws IOException { + runner.setProperty(DeleteElasticsearch5.TYPE, "${type}"); + + runner.enqueue(new byte [] {}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0); + assertNotNull(out); + assertEquals("Document type is required but was empty",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE)); + } + + @Test + public void testDeleteSuccessful() throws IOException { + restStatus = RestStatus.OK; + deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) { + + @Override + public RestStatus status() { + return restStatus; + } + + }; + runner.enqueue(new byte [] {}, new HashMap() {{ + put("documentId", documentId); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_SUCCESS).get(0); + assertNotNull(out); + assertEquals(null,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE)); + out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId); + out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1); + out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1); + } + + @Test + public void testDeleteNotFound() throws IOException { + restStatus = RestStatus.NOT_FOUND; + deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) { + + @Override + public RestStatus status() { + return restStatus; + } + + }; + runner.enqueue(new byte [] {}, new HashMap() {{ + put("documentId", documentId); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_NOT_FOUND, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_NOT_FOUND).get(0); + assertNotNull(out); + assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE)); + out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString()); + out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId); + out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1); + out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1); + } + + @Test + public void testDeleteServerFailure() throws IOException { + restStatus = RestStatus.SERVICE_UNAVAILABLE; + deleteResponse = new DeleteResponse(null, TYPE1, documentId, 1, true) { + + @Override + public RestStatus status() { + return restStatus; + } + + }; + runner.enqueue(new byte [] {}, new HashMap() {{ + put("documentId", documentId); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0); + assertNotNull(out); + assertEquals(DeleteElasticsearch5.UNABLE_TO_DELETE_DOCUMENT_MESSAGE,out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE)); + out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, restStatus.toString()); + out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId); + out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1); + out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1); + } + + @Test + public void testDeleteRetryableException() throws IOException { + mockDeleteProcessor = new DeleteElasticsearch5() { + + @Override + protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) { + return null; + } + + @Override + protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder) + throws InterruptedException, ExecutionException { + throw new ElasticsearchTimeoutException("timeout"); + } + + @Override + public void setup(ProcessContext context) { + } + + }; + runner = TestRunners.newTestRunner(mockDeleteProcessor); + + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s"); + + runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1); + runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1); + runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}"); + runner.assertValid(); + + runner.enqueue(new byte [] {}, new HashMap() {{ + put("documentId", documentId); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_RETRY, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_RETRY).get(0); + assertNotNull(out); + assertEquals("timeout",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE)); + out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null); + out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId); + out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1); + out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1); + } + + @Test + public void testDeleteNonRetryableException() throws IOException { + mockDeleteProcessor = new DeleteElasticsearch5() { + + @Override + protected DeleteRequestBuilder prepareDeleteRequest(String index, String docId, String docType) { + return null; + } + + @Override + protected DeleteResponse doDelete(DeleteRequestBuilder requestBuilder) + throws InterruptedException, ExecutionException { + throw new InterruptedException("exception"); + } + + @Override + public void setup(ProcessContext context) { + } + + }; + runner = TestRunners.newTestRunner(mockDeleteProcessor); + + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.CLUSTER_NAME, "elasticsearch"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.HOSTS, "127.0.0.1:9300"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.PING_TIMEOUT, "5s"); + runner.setProperty(AbstractElasticsearch5TransportClientProcessor.SAMPLER_INTERVAL, "5s"); + + runner.setProperty(DeleteElasticsearch5.INDEX, INDEX1); + runner.setProperty(DeleteElasticsearch5.TYPE, TYPE1); + runner.setProperty(DeleteElasticsearch5.DOCUMENT_ID, "${documentId}"); + runner.assertValid(); + + runner.enqueue(new byte [] {}, new HashMap() {{ + put("documentId", documentId); + }}); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(DeleteElasticsearch5.REL_FAILURE, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(DeleteElasticsearch5.REL_FAILURE).get(0); + assertNotNull(out); + assertEquals("exception",out.getAttribute(DeleteElasticsearch5.ES_ERROR_MESSAGE)); + out.assertAttributeEquals(DeleteElasticsearch5.ES_REST_STATUS, null); + out.assertAttributeEquals(DeleteElasticsearch5.ES_FILENAME, documentId); + out.assertAttributeEquals(DeleteElasticsearch5.ES_INDEX, INDEX1); + out.assertAttributeEquals(DeleteElasticsearch5.ES_TYPE, TYPE1); + } + +}