From e51ab8c9d6b678dc4e9549562d7c9f58f0445a04 Mon Sep 17 00:00:00 2001 From: Joe Gresock Date: Thu, 16 Jan 2020 20:30:49 +0000 Subject: [PATCH] NIFI-7036: This closes #3993. Adding 'Append to Attributes' to QueryElasticsearchHttp Signed-off-by: Joe Witt --- .../elasticsearch/QueryElasticsearchHttp.java | 32 +++++++--- .../TestQueryElasticsearchHttpNoHits.java | 60 ++++++++++++++++++- 2 files changed, 82 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java index 7cb1fb40b3..ea350ad59c 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.java @@ -91,7 +91,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { public enum QueryInfoRouteStrategy { NEVER, ALWAYS, - NOHIT + NOHIT, + APPEND_AS_ATTRIBUTES } private static final String FROM_QUERY_PARAM = "from"; @@ -103,6 +104,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { static final AllowableValue ALWAYS = new AllowableValue(QueryInfoRouteStrategy.ALWAYS.name(), "Always", "Always route Query Info"); static final AllowableValue NEVER = new AllowableValue(QueryInfoRouteStrategy.NEVER.name(), "Never", "Never route Query Info"); static final AllowableValue NO_HITS = new AllowableValue(QueryInfoRouteStrategy.NOHIT.name(), "No Hits", "Route Query Info if the Query returns no hits"); + static final AllowableValue APPEND_AS_ATTRIBUTES = new AllowableValue(QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name(), "Append as Attributes", + "Always append Query Info as attributes, using the existing relationships (does not add the Query Info relationship)."); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description( @@ -221,7 +224,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .displayName("Routing Strategy for Query Info") .description("Specifies when to generate and route Query Info after a successful query") .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues(ALWAYS, NEVER, NO_HITS) + .allowableValues(ALWAYS, NEVER, NO_HITS, APPEND_AS_ATTRIBUTES) .defaultValue(NEVER.getValue()) .required(false) .build(); @@ -399,9 +402,9 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { if ( (hits.size() == 0 && priorResultCount == 0 && queryInfoRouteStrategy == QueryInfoRouteStrategy.NOHIT) || queryInfoRouteStrategy == QueryInfoRouteStrategy.ALWAYS) { FlowFile queryInfo = flowFile == null ? session.create() : session.create(flowFile); - session.putAttribute(queryInfo, "es.query.url", url.toExternalForm()); - session.putAttribute(queryInfo, "es.query.hitcount", String.valueOf(hits.size())); - session.putAttribute(queryInfo, MIME_TYPE.key(), "application/json"); + queryInfo = session.putAttribute(queryInfo, "es.query.url", url.toExternalForm()); + queryInfo = session.putAttribute(queryInfo, "es.query.hitcount", String.valueOf(hits.size())); + queryInfo = session.putAttribute(queryInfo, MIME_TYPE.key(), "application/json"); session.transfer(queryInfo,REL_QUERY_INFO); } @@ -418,6 +421,10 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { documentFlowFile = session.create(); } + if (queryInfoRouteStrategy == QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES) { + documentFlowFile = session.putAttribute(documentFlowFile, "es.query.hitcount", String.valueOf(hits.size())); + } + JsonNode source = hit.get("_source"); documentFlowFile = session.putAttribute(documentFlowFile, "es.id", retrievedId); documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex); @@ -451,9 +458,20 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor { } page.add(documentFlowFile); } - logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success"); - session.transfer(page, REL_SUCCESS); + logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success"); + // If we want to append query info as attributes but there were no hits, + // pass along the original, if present. + if (queryInfoRouteStrategy == QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES && page.isEmpty() + && flowFile != null) { + FlowFile documentFlowFile = null; + documentFlowFile = targetIsContent ? session.create(flowFile) : session.clone(flowFile); + documentFlowFile = session.putAttribute(documentFlowFile, "es.query.hitcount", String.valueOf(hits.size())); + documentFlowFile = session.putAttribute(documentFlowFile, "es.query.url", url.toExternalForm()); + session.transfer(documentFlowFile, REL_SUCCESS); + } else { + session.transfer(page, REL_SUCCESS); + } } else { try { // 5xx -> RETRY, but a server error might last a while, so yield diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java index d2113bded1..b648ec8dfa 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/test/java/org/apache/nifi/processors/elasticsearch/TestQueryElasticsearchHttpNoHits.java @@ -189,9 +189,56 @@ public class TestQueryElasticsearchHttpNoHits { runAndVerify(3,3,2,true); } + @Test + public void testQueryElasticsearchOnTrigger_Hits_AppendAsAttributes() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(false)); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name()); + runner.assertValid(); + + runner.setIncomingConnection(true); + runAndVerify(1,0,0,false, false); + } + + @Test + public void testQueryElasticsearchOnTrigger_Hits_AppendAsAttributes_noHits() throws IOException { + runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true)); + runner.setValidateExpressionUsage(true); + runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200"); + + runner.setProperty(QueryElasticsearchHttp.INDEX, "doc"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.TYPE, "status"); + runner.assertNotValid(); + runner.setProperty(QueryElasticsearchHttp.QUERY, + "source:Twitter AND identifier:\"${identifier}\""); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2"); + runner.assertValid(); + runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name()); + runner.assertValid(); + + runner.setIncomingConnection(false); + runAndVerify(3,3,2,true, false); + } private void runAndVerify(int expectedResults,int expectedQueryInfoResults,int expectedHits, boolean targetIsContent) { + runAndVerify(expectedResults, expectedQueryInfoResults, expectedHits, targetIsContent, true); + } + + private void runAndVerify(int expectedResults,int expectedQueryInfoResults,int expectedHits, boolean targetIsContent, + boolean expectHitCountOnQueryInfo) { runner.enqueue("blah".getBytes(), new HashMap() { { put("identifier", "28039652140"); @@ -201,20 +248,27 @@ public class TestQueryElasticsearchHttpNoHits { // Running once should page through the no hit doc runner.run(1, true, true); - runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO, expectedQueryInfoResults); - if (expectedQueryInfoResults > 0) { + if (expectHitCountOnQueryInfo) { + runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO, expectedQueryInfoResults); + if (expectedQueryInfoResults > 0) { final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_QUERY_INFO).get(0); assertNotNull(out); if (targetIsContent) { + if (expectHitCountOnQueryInfo) { out.assertAttributeEquals("es.query.hitcount", String.valueOf(expectedHits)); - Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2")); + } + Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2")); } + } } runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, expectedResults); if (expectedResults > 0) { final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_SUCCESS).get(0); assertNotNull(out); + if (!expectHitCountOnQueryInfo) { + out.assertAttributeEquals("es.query.hitcount", String.valueOf(expectedHits)); + } if (targetIsContent) { out.assertAttributeEquals("filename", "abc-97b-ASVsZu_" + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3"); }