NIFI-7036: This closes #3993. Adding 'Append to Attributes' to QueryElasticsearchHttp

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Joe Gresock 2020-01-16 20:30:49 +00:00 committed by Joe Witt
parent c1301e196c
commit e51ab8c9d6
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
2 changed files with 82 additions and 10 deletions

View File

@ -91,7 +91,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
public enum QueryInfoRouteStrategy {
NEVER,
ALWAYS,
NOHIT
NOHIT,
APPEND_AS_ATTRIBUTES
}
private static final String FROM_QUERY_PARAM = "from";
@ -103,6 +104,8 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
static final AllowableValue ALWAYS = new AllowableValue(QueryInfoRouteStrategy.ALWAYS.name(), "Always", "Always route Query Info");
static final AllowableValue NEVER = new AllowableValue(QueryInfoRouteStrategy.NEVER.name(), "Never", "Never route Query Info");
static final AllowableValue NO_HITS = new AllowableValue(QueryInfoRouteStrategy.NOHIT.name(), "No Hits", "Route Query Info if the Query returns no hits");
static final AllowableValue APPEND_AS_ATTRIBUTES = new AllowableValue(QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name(), "Append as Attributes",
"Always append Query Info as attributes, using the existing relationships (does not add the Query Info relationship).");
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description(
@ -221,7 +224,7 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.displayName("Routing Strategy for Query Info")
.description("Specifies when to generate and route Query Info after a successful query")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(ALWAYS, NEVER, NO_HITS)
.allowableValues(ALWAYS, NEVER, NO_HITS, APPEND_AS_ATTRIBUTES)
.defaultValue(NEVER.getValue())
.required(false)
.build();
@ -399,9 +402,9 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
if ( (hits.size() == 0 && priorResultCount == 0 && queryInfoRouteStrategy == QueryInfoRouteStrategy.NOHIT)
|| queryInfoRouteStrategy == QueryInfoRouteStrategy.ALWAYS) {
FlowFile queryInfo = flowFile == null ? session.create() : session.create(flowFile);
session.putAttribute(queryInfo, "es.query.url", url.toExternalForm());
session.putAttribute(queryInfo, "es.query.hitcount", String.valueOf(hits.size()));
session.putAttribute(queryInfo, MIME_TYPE.key(), "application/json");
queryInfo = session.putAttribute(queryInfo, "es.query.url", url.toExternalForm());
queryInfo = session.putAttribute(queryInfo, "es.query.hitcount", String.valueOf(hits.size()));
queryInfo = session.putAttribute(queryInfo, MIME_TYPE.key(), "application/json");
session.transfer(queryInfo,REL_QUERY_INFO);
}
@ -418,6 +421,10 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
documentFlowFile = session.create();
}
if (queryInfoRouteStrategy == QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES) {
documentFlowFile = session.putAttribute(documentFlowFile, "es.query.hitcount", String.valueOf(hits.size()));
}
JsonNode source = hit.get("_source");
documentFlowFile = session.putAttribute(documentFlowFile, "es.id", retrievedId);
documentFlowFile = session.putAttribute(documentFlowFile, "es.index", retrievedIndex);
@ -451,9 +458,20 @@ public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
}
page.add(documentFlowFile);
}
logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success");
logger.debug("Elasticsearch retrieved " + responseJson.size() + " documents, routing to success");
// If we want to append query info as attributes but there were no hits,
// pass along the original, if present.
if (queryInfoRouteStrategy == QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES && page.isEmpty()
&& flowFile != null) {
FlowFile documentFlowFile = null;
documentFlowFile = targetIsContent ? session.create(flowFile) : session.clone(flowFile);
documentFlowFile = session.putAttribute(documentFlowFile, "es.query.hitcount", String.valueOf(hits.size()));
documentFlowFile = session.putAttribute(documentFlowFile, "es.query.url", url.toExternalForm());
session.transfer(documentFlowFile, REL_SUCCESS);
} else {
session.transfer(page, REL_SUCCESS);
}
} else {
try {
// 5xx -> RETRY, but a server error might last a while, so yield

View File

@ -189,9 +189,56 @@ public class TestQueryElasticsearchHttpNoHits {
runAndVerify(3,3,2,true);
}
@Test
public void testQueryElasticsearchOnTrigger_Hits_AppendAsAttributes() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(false));
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name());
runner.assertValid();
runner.setIncomingConnection(true);
runAndVerify(1,0,0,false, false);
}
@Test
public void testQueryElasticsearchOnTrigger_Hits_AppendAsAttributes_noHits() throws IOException {
runner = TestRunners.newTestRunner(new QueryElasticsearchHttpTestProcessor(true));
runner.setValidateExpressionUsage(true);
runner.setProperty(AbstractElasticsearchHttpProcessor.ES_URL, "http://127.0.0.1:9200");
runner.setProperty(QueryElasticsearchHttp.INDEX, "doc");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.TYPE, "status");
runner.assertNotValid();
runner.setProperty(QueryElasticsearchHttp.QUERY,
"source:Twitter AND identifier:\"${identifier}\"");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.PAGE_SIZE, "2");
runner.assertValid();
runner.setProperty(QueryElasticsearchHttp.ROUTING_QUERY_INFO_STRATEGY, QueryElasticsearchHttp.QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name());
runner.assertValid();
runner.setIncomingConnection(false);
runAndVerify(3,3,2,true, false);
}
private void runAndVerify(int expectedResults,int expectedQueryInfoResults,int expectedHits, boolean targetIsContent) {
runAndVerify(expectedResults, expectedQueryInfoResults, expectedHits, targetIsContent, true);
}
private void runAndVerify(int expectedResults,int expectedQueryInfoResults,int expectedHits, boolean targetIsContent,
boolean expectHitCountOnQueryInfo) {
runner.enqueue("blah".getBytes(), new HashMap<String, String>() {
{
put("identifier", "28039652140");
@ -201,20 +248,27 @@ public class TestQueryElasticsearchHttpNoHits {
// Running once should page through the no hit doc
runner.run(1, true, true);
if (expectHitCountOnQueryInfo) {
runner.assertTransferCount(QueryElasticsearchHttp.REL_QUERY_INFO, expectedQueryInfoResults);
if (expectedQueryInfoResults > 0) {
final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_QUERY_INFO).get(0);
assertNotNull(out);
if (targetIsContent) {
if (expectHitCountOnQueryInfo) {
out.assertAttributeEquals("es.query.hitcount", String.valueOf(expectedHits));
}
Assert.assertTrue(out.getAttribute("es.query.url").startsWith("http://127.0.0.1:9200/doc/status/_search?q=source:Twitter%20AND%20identifier:%22%22&size=2"));
}
}
}
runner.assertTransferCount(QueryElasticsearchHttp.REL_SUCCESS, expectedResults);
if (expectedResults > 0) {
final MockFlowFile out = runner.getFlowFilesForRelationship(QueryElasticsearchHttp.REL_SUCCESS).get(0);
assertNotNull(out);
if (!expectHitCountOnQueryInfo) {
out.assertAttributeEquals("es.query.hitcount", String.valueOf(expectedHits));
}
if (targetIsContent) {
out.assertAttributeEquals("filename", "abc-97b-ASVsZu_" + "vShwtGCJpGOObmuSqUJRUC3L_-SEND-S3");
}