mirror of https://github.com/apache/nifi.git
NIFI-10845 - JsonQueryElasticsearch processors are not outputting an empty flow file for a combined response with output_no_hits set to true
Signed-off-by: Chris Sampson <chris.sampson82@gmail.com> This closes #6701
This commit is contained in:
parent
2393785765
commit
2ad33eea80
|
@ -105,6 +105,10 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
||||||
private String splitUpAggregations;
|
private String splitUpAggregations;
|
||||||
private boolean outputNoHits;
|
private boolean outputNoHits;
|
||||||
|
|
||||||
|
boolean getOutputNoHits() {
|
||||||
|
return outputNoHits;
|
||||||
|
}
|
||||||
|
|
||||||
final ObjectMapper mapper = new ObjectMapper();
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
|
||||||
final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
|
final AtomicReference<ElasticSearchClientService> clientService = new AtomicReference<>(null);
|
||||||
|
@ -277,8 +281,8 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
|
FlowFile writeHitFlowFile(final int count, final String json, final ProcessSession session,
|
||||||
final FlowFile hitFlowFile, final Map<String, String> attributes) {
|
final FlowFile hitFlowFile, final Map<String, String> attributes) {
|
||||||
final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
|
final FlowFile ff = session.write(hitFlowFile, out -> out.write(json.getBytes()));
|
||||||
attributes.put("hit.count", Integer.toString(count));
|
attributes.put("hit.count", Integer.toString(count));
|
||||||
|
|
||||||
|
|
|
@ -273,6 +273,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
|
||||||
|
|
||||||
hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(),
|
hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(),
|
||||||
hits, session, hitFlowFile, attributes, append));
|
hits, session, hitFlowFile, attributes, append));
|
||||||
|
} else if (getOutputNoHits()) {
|
||||||
|
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
|
||||||
|
hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -268,4 +268,43 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
|
||||||
assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
|
assertThat(runner.getProvenanceEvents().stream().filter({ pe -> pe.getEventType() == ProvenanceEventType.SEND}).count(), is(0L))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
|
||||||
|
final TestRunner runner = createRunner(false)
|
||||||
|
final TestElasticsearchClientService service = getService(runner)
|
||||||
|
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]])))
|
||||||
|
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
|
||||||
|
service.setMaxPages(0)
|
||||||
|
|
||||||
|
// test that an empty flow file is produced for a per query setup
|
||||||
|
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_QUERY)
|
||||||
|
runOnce(runner)
|
||||||
|
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
|
||||||
|
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
|
||||||
|
reset(runner)
|
||||||
|
|
||||||
|
// test that an empty flow file is produced for a per hit setup
|
||||||
|
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_HIT)
|
||||||
|
runOnce(runner)
|
||||||
|
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
|
||||||
|
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
|
||||||
|
reset(runner)
|
||||||
|
|
||||||
|
// test that an empty flow file is produced for a per response setup
|
||||||
|
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, AbstractPaginatedJsonQueryElasticsearch.FLOWFILE_PER_RESPONSE)
|
||||||
|
runOnce(runner)
|
||||||
|
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
|
||||||
|
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
|
||||||
|
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
|
||||||
|
reset(runner)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue