diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java index ae75d08310..45d818afcf 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/GetElasticsearch.java @@ -35,8 +35,10 @@ import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import org.apache.nifi.util.StringUtils; import java.util.Arrays; import java.util.Collections; @@ -161,6 +163,10 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue(); try { + if (StringUtils.isBlank(id)) { + throw new ProcessException(ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document"); + } + final StopWatch stopWatch = new StopWatch(true); final Map doc = clientService.get(index, type, id, getUrlQueryParameters(context, input)); @@ -182,22 +188,7 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch session.getProvenanceReporter().receive(documentFlowFile, clientService.getTransitUrl(index, type), stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(documentFlowFile, REL_DOC); } catch (final ElasticsearchException ese) { - if (ese.isNotFound()) { - if (input != null) { - session.transfer(input, REL_NOT_FOUND); - } else { - getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type); - } - } else { - final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s", - ese.isElastic() ? "Routing to retry." : "Routing to failure"); - getLogger().error(msg, ese); - if (input != null) { - session.penalize(input); - input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage()); - session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE); - } - } + handleElasticsearchException(ese, input, session, index, type, id); } catch (final Exception ex) { getLogger().error("Could not fetch document.", ex); if (input != null) { @@ -207,4 +198,24 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch context.yield(); } } + + private void handleElasticsearchException(final ElasticsearchException ese, FlowFile input, final ProcessSession session, + final String index, final String type, final String id) { + if (ese.isNotFound()) { + if (input != null) { + session.transfer(input, REL_NOT_FOUND); + } else { + getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type); + } + } else { + final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s", + ese.isElastic() ? "Routing to retry." : "Routing to failure"); + getLogger().error(msg, ese); + if (input != null) { + session.penalize(input); + input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage()); + session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE); + } + } + } } diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy index 4828c30bc3..2a459cf035 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/GetElasticsearchTest.groovy @@ -191,6 +191,18 @@ class GetElasticsearchTest { @Test void testRequestParameters() { + final TestRunner runner = createRunner() + runner.setProperty(GetElasticsearch.ID, "\${noAttribute}") + + runProcessor(runner) + testCounts(runner, 0, 1, 0, 0) + final FlowFile failed = runner.getFlowFilesForRelationship(GetElasticsearch.REL_FAILURE).get(0) + failed.assertAttributeEquals("elasticsearch.get.error", GetElasticsearch.ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document") + reset(runner) + } + + @Test + void testEmptyId() { final TestRunner runner = createRunner() runner.setProperty("refresh", "true") runner.setProperty("_source", '${source}')