NIFI-9715 add option to output empty FlowFile from Elasticsearch REST API Json Query processors when there are no hits from query

This closes #5786

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Chris Sampson 2022-02-21 21:12:24 +00:00 committed by Mike Thomsen
parent 4141ed29ec
commit 07131a66ea
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
3 changed files with 70 additions and 6 deletions

View File

@ -86,6 +86,17 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor OUTPUT_NO_HITS = new PropertyDescriptor.Builder()
.name("el-rest-output-no-hits")
.displayName("Output No Hits")
.description("Output a \"" + REL_HITS.getName() + "\" flowfile even if no hits found for query. " +
"If true, an empty \"" + REL_HITS.getName() + "\" flowfile will be output even if \"" +
REL_AGGREGATIONS.getName() + "\" are output.")
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
private static final Set<Relationship> relationships;
private static final List<PropertyDescriptor> propertyDescriptors;
@ -93,6 +104,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
AtomicReference<ElasticSearchClientService> clientService;
String splitUpHits;
private String splitUpAggregations;
private boolean outputNoHits;
final ObjectMapper mapper = new ObjectMapper();
@ -112,6 +124,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
descriptors.add(CLIENT_SERVICE);
descriptors.add(SEARCH_RESULTS_SPLIT);
descriptors.add(AGGREGATION_RESULTS_SPLIT);
descriptors.add(OUTPUT_NO_HITS);
propertyDescriptors = Collections.unmodifiableList(descriptors);
}
@ -143,6 +156,8 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
splitUpHits = context.getProperty(SEARCH_RESULTS_SPLIT).getValue();
splitUpAggregations = context.getProperty(AGGREGATION_RESULTS_SPLIT).getValue();
outputNoHits = context.getProperty(OUTPUT_NO_HITS).asBoolean();
}
@OnStopped
@ -271,7 +286,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
* for paginated queries, the List could contain one (or more) FlowFiles, to which further hits may be appended when the next
* SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
*/
List<FlowFile> handleHits(final List<Map<String, Object>> hits, final Q queryJsonParameters, final ProcessSession session,
List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final Q queryJsonParameters, final ProcessSession session,
final FlowFile parent, final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles,
final String transitUri, final StopWatch stopWatch) throws IOException {
if (hits != null && !hits.isEmpty()) {
@ -286,6 +301,9 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
final String json = mapper.writeValueAsString(hits);
hitsFlowFiles.add(writeHitFlowFile(hits.size(), json, session, hitFlowFile, attributes));
}
} else if (newQuery && outputNoHits) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes));
}
transferResultFlowFiles(session, hitsFlowFiles, transitUri, stopWatch);
@ -319,7 +337,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
handleAggregations(response.getAggregations(), session, input, attributes, transitUri, stopWatch);
}
final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), queryJsonParameters, session, input,
final List<FlowFile> resultFlowFiles = handleHits(response.getHits(), newQuery, queryJsonParameters, session, input,
attributes, hitsFlowFiles, transitUri, stopWatch);
queryJsonParameters.addHitCount(response.getHits().size());

View File

@ -109,6 +109,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
descriptors.add(AGGREGATION_RESULTS_SPLIT);
descriptors.add(PAGINATION_TYPE);
descriptors.add(PAGINATION_KEEP_ALIVE);
descriptors.add(OUTPUT_NO_HITS);
paginatedPropertyDescriptors = Collections.unmodifiableList(descriptors);
}
@ -282,7 +283,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
* SearchResponse is processed, i.e. this approach allows recursion for paginated queries, but is unnecessary for single-response queries.
*/
@Override
List<FlowFile> handleHits(final List<Map<String, Object>> hits, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
List<FlowFile> handleHits(final List<Map<String, Object>> hits, final boolean newQuery, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
final ProcessSession session, final FlowFile parent, final Map<String, String> attributes,
final List<FlowFile> hitsFlowFiles, final String transitUri, final StopWatch stopWatch) throws IOException {
paginatedJsonQueryParameters.incrementPageCount();
@ -298,7 +299,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
hitsFlowFiles.clear();
}
} else {
super.handleHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, transitUri, stopWatch);
super.handleHits(hits, newQuery, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, transitUri, stopWatch);
}
return hitsFlowFiles;

View File

@ -52,6 +52,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
runner.removeProperty(AbstractJsonQueryElasticsearch.QUERY_ATTRIBUTE)
runner.removeProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT)
runner.removeProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT)
runner.removeProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS)
final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
if (processor instanceof SearchElasticsearch) {
@ -82,13 +83,14 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, "not-json")
runner.setProperty(AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT, "not-enum")
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, "not-enum2")
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "not-boolean")
final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch
? [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY].join(", ")
: [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", ")
final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 7 validation failures:\n" +
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 8 validation failures:\n" +
"'%s' validated against 'not-json' is invalid because %s is not a valid JSON representation due to Unrecognized token 'not': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n" +
" at [Source: (String)\"not-json\"; line: 1, column: 4]\n" +
"'%s' validated against '' is invalid because %s cannot be empty\n" +
@ -96,6 +98,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
"'%s' validated against 'not-a-service' is invalid because Property references a Controller Service that does not exist\n" +
"'%s' validated against 'not-enum2' is invalid because Given value not found in allowed set '%s'\n" +
"'%s' validated against 'not-enum' is invalid because Given value not found in allowed set '%s'\n" +
"'%s' validated against 'not-boolean' is invalid because Given value not found in allowed set 'true, false'\n" +
"'%s' validated against 'not-a-service' is invalid because Invalid Controller Service: not-a-service is not a valid Controller Service Identifier\n",
AbstractJsonQueryElasticsearch.QUERY.getName(), AbstractJsonQueryElasticsearch.QUERY.getName(),
AbstractJsonQueryElasticsearch.INDEX.getName(), AbstractJsonQueryElasticsearch.INDEX.getName(),
@ -103,6 +106,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName(),
AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT.getName(), expectedAllowedSplitHits,
AbstractJsonQueryElasticsearch.AGGREGATION_RESULTS_SPLIT.getName(), [AbstractJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE, AbstractJsonQueryElasticsearch.FLOWFILE_PER_HIT].join(", "),
AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS.getName(),
AbstractJsonQueryElasticsearch.CLIENT_SERVICE.getDisplayName()
)))
}
@ -146,6 +150,45 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
)
}
@Test
void testNoHits() throws Exception {
// test no hits (no output)
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(0)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [ match_all: [:] ]])))
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "false")
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 0, 0, 0)
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
pe.getAttribute("uuid") == hits.getAttribute("uuid")
}).count(),
is(0L)
)
reset(runner)
// test not hits (with output)
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
{ hit ->
hit.assertAttributeEquals("hit.count", "0")
assertOutputContent(hit.getContent(), 0, false)
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
pe.getAttribute("uuid") == hit.getAttribute("uuid")
}).count(),
is(1L)
)
}
)
}
@Test
void testAggregations() throws Exception {
String query = prettyPrint(toJson([
@ -302,7 +345,9 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
if (ndjson) {
assertThat(content.split("\n").length, is(count))
} else {
if (count == 1) {
if (count == 0) {
assertThat(content, is(""))
} else if (count == 1) {
assertThat(content.startsWith("{") && content.endsWith("}"), is(true))
} else {
assertThat(content.startsWith("[") && content.endsWith("]"), is(true))