NIFI-10303 route GetElasticsearch to failure if _id is blank after attribute evaluation

This closes #6573

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Chris Sampson 2022-10-24 20:44:17 +01:00 committed by Mike Thomsen
parent db49a861b3
commit b6026f5709
2 changed files with 39 additions and 16 deletions

View File

@ -35,8 +35,10 @@ import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import java.util.Arrays;
import java.util.Collections;
@ -161,6 +163,10 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
final String attributeName = context.getProperty(ATTRIBUTE_NAME).evaluateAttributeExpressions(input).getValue();
try {
if (StringUtils.isBlank(id)) {
throw new ProcessException(ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document");
}
final StopWatch stopWatch = new StopWatch(true);
final Map<String, Object> doc = clientService.get(index, type, id, getUrlQueryParameters(context, input));
@ -182,22 +188,7 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
session.getProvenanceReporter().receive(documentFlowFile, clientService.getTransitUrl(index, type), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(documentFlowFile, REL_DOC);
} catch (final ElasticsearchException ese) {
if (ese.isNotFound()) {
if (input != null) {
session.transfer(input, REL_NOT_FOUND);
} else {
getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type);
}
} else {
final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Routing to retry." : "Routing to failure");
getLogger().error(msg, ese);
if (input != null) {
session.penalize(input);
input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage());
session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
}
}
handleElasticsearchException(ese, input, session, index, type, id);
} catch (final Exception ex) {
getLogger().error("Could not fetch document.", ex);
if (input != null) {
@ -207,4 +198,24 @@ public class GetElasticsearch extends AbstractProcessor implements Elasticsearch
context.yield();
}
}
private void handleElasticsearchException(final ElasticsearchException ese, FlowFile input, final ProcessSession session,
final String index, final String type, final String id) {
if (ese.isNotFound()) {
if (input != null) {
session.transfer(input, REL_NOT_FOUND);
} else {
getLogger().warn("Document with _id {} not found in index {} (and type {})", id, index, type);
}
} else {
final String msg = String.format("Encountered a server-side problem with Elasticsearch. %s",
ese.isElastic() ? "Routing to retry." : "Routing to failure");
getLogger().error(msg, ese);
if (input != null) {
session.penalize(input);
input = session.putAttribute(input, "elasticsearch.get.error", ese.getMessage());
session.transfer(input, ese.isElastic() ? REL_RETRY : REL_FAILURE);
}
}
}
}

View File

@ -191,6 +191,18 @@ class GetElasticsearchTest {
@Test
void testRequestParameters() {
final TestRunner runner = createRunner()
runner.setProperty(GetElasticsearch.ID, "\${noAttribute}")
runProcessor(runner)
testCounts(runner, 0, 1, 0, 0)
final FlowFile failed = runner.getFlowFilesForRelationship(GetElasticsearch.REL_FAILURE).get(0)
failed.assertAttributeEquals("elasticsearch.get.error", GetElasticsearch.ID.getDisplayName() + " is blank (after evaluating attribute expressions), cannot GET document")
reset(runner)
}
@Test
void testEmptyId() {
final TestRunner runner = createRunner()
runner.setProperty("refresh", "true")
runner.setProperty("_source", '${source}')