NIFI-11430 - PaginatedJsonQueryElasticsearch processors should not output empty FlowFile if hits have been found; PaginatedJsonQueryElasticsearch processors should be able to use _source and _meta only result formats when grouping by query

This closes #7163

Signed-off-by: Chris Sampson <chris.sampson82@gmail.com>
This commit is contained in:
Ryan Van Den Bos 2023-04-12 10:21:57 +01:00 committed by Chris Sampson
parent f7e36a07ac
commit 658f2547d8
No known key found for this signature in database
GPG Key ID: 546AEB0826587237
7 changed files with 199 additions and 105 deletions

View File

@ -367,7 +367,7 @@ public abstract class AbstractJsonQueryElasticsearch<Q extends JsonQueryParamete
}
@SuppressWarnings("unchecked")
private List<Map<String, Object>> formatHits(final List<Map<String, Object>> hits) {
List<Map<String, Object>> formatHits(final List<Map<String, Object>> hits) {
final List<Map<String, Object>> formattedHits;
if (hitFormat == SearchResultsFormat.METADATA_ONLY) {

View File

@ -75,6 +75,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
.build();
static final List<PropertyDescriptor> paginatedPropertyDescriptors;
static {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(QUERY_ATTRIBUTE);
@ -239,7 +240,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
private void combineHits(final List<Map<String, Object>> hits, final PaginatedJsonQueryParameters paginatedJsonQueryParameters,
final ProcessSession session, final FlowFile parent,
final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles) {
final Map<String, String> attributes, final List<FlowFile> hitsFlowFiles, final boolean newQuery) {
if (hits != null && !hits.isEmpty()) {
final FlowFile hitFlowFile;
final boolean append = !hitsFlowFiles.isEmpty();
@ -251,7 +252,7 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
hitsFlowFiles.add(writeCombinedHitFlowFile(paginatedJsonQueryParameters.getHitCount() + hits.size(),
hits, session, hitFlowFile, attributes, append));
} else if (isOutputNoHits()) {
} else if (isOutputNoHits() && newQuery) {
final FlowFile hitFlowFile = createChildFlowFile(session, parent);
hitsFlowFiles.add(writeHitFlowFile(0, "", session, hitFlowFile, attributes));
}
@ -271,7 +272,9 @@ public abstract class AbstractPaginatedJsonQueryElasticsearch extends AbstractJs
attributes.put("page.number", Integer.toString(paginatedJsonQueryParameters.getPageCount()));
if (hitStrategy == ResultOutputStrategy.PER_QUERY) {
combineHits(hits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles);
final List<Map<String, Object>> formattedHits = formatHits(hits);
combineHits(formattedHits, paginatedJsonQueryParameters, session, parent, attributes, hitsFlowFiles, newQuery);
// output results if it seems we've combined all available results (i.e. no hits in this page and therefore no more expected)
if (!hitsFlowFiles.isEmpty() && (hits == null || hits.isEmpty())) {

View File

@ -42,9 +42,8 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf
import static org.junit.jupiter.api.Assertions.assertNotNull
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.junit.jupiter.api.Assertions.assertTrue
abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryElasticsearch> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
static final String INDEX_NAME = "messages"
@ -102,7 +101,8 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
.collect(Collectors.joining(", "))
final String expectedAllowedSplitHits = processor instanceof AbstractPaginatedJsonQueryElasticsearch
? ResultOutputStrategy.values().collect { r -> r.getValue() }.join(", ")
: nonPaginatedResultOutputStrategies
: ResultOutputStrategy.getNonPaginatedResponseOutputStrategies().stream()
.map(r -> r.getValue()).collect(Collectors.joining(", "))
final AssertionError assertionError = assertThrows(AssertionError.class, runner.&run)
assertThat(assertionError.getMessage(), equalTo(String.format("Processor has 8 validation failures:\n" +
@ -217,8 +217,7 @@ abstract class AbstractJsonQueryElasticsearchTest<P extends AbstractJsonQueryEla
testCounts(runner, isInput() ? 1 : 0, 0, 0, 0)
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
pe.getAttribute("uuid") == hits.getAttribute("uuid")
pe.getEventType() == ProvenanceEventType.RECEIVE
}).count(),
is(0L)
)

View File

@ -32,7 +32,9 @@ import static groovy.json.JsonOutput.toJson
import static org.hamcrest.CoreMatchers.equalTo
import static org.hamcrest.CoreMatchers.is
import static org.hamcrest.MatcherAssert.assertThat
import static org.junit.jupiter.api.Assertions.assertFalse
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.junit.jupiter.api.Assertions.assertTrue
abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQueryElasticsearchTest<AbstractPaginatedJsonQueryElasticsearch> {
abstract boolean isInput()
@ -117,6 +119,97 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
assertSendEvent(runner, input)
}
static void assertFormattedResult(final SearchResultsFormat searchResultsFormat, final Map<String, Object> hit) {
assertFalse(hit.isEmpty())
switch(searchResultsFormat) {
case SearchResultsFormat.SOURCE_ONLY:
assertFalse(hit.containsKey("_source"))
assertFalse(hit.containsKey("_index"))
break
case SearchResultsFormat.METADATA_ONLY:
assertFalse(hit.containsKey("_source"))
assertTrue(hit.containsKey("_index"))
break
case SearchResultsFormat.FULL:
assertTrue(hit.containsKey("_source"))
assertTrue(hit.containsKey("_index"))
break
default:
throw new IllegalArgumentException("Unknown SearchResultsFormat value: " + searchResultsFormat.toString())
}
}
private void assertResultsFormat(final TestRunner runner, final ResultOutputStrategy resultOutputStrategy, final SearchResultsFormat searchResultsFormat) {
int flowFileCount
String hitsCount
boolean ndjson = false
switch (resultOutputStrategy) {
case ResultOutputStrategy.PER_QUERY:
flowFileCount = 1
hitsCount = "10"
ndjson = true
break
case ResultOutputStrategy.PER_HIT:
flowFileCount = 10
hitsCount = "1"
break
case ResultOutputStrategy.PER_RESPONSE:
flowFileCount = 1
hitsCount = "10"
break
default:
throw new IllegalArgumentException("Unknown ResultOutputStrategy value: " + resultOutputStrategy.toString())
}
// Test Relationship counts
testCounts(runner, isInput() ? 1 : 0, flowFileCount, 0, 0)
// Per response outputs an array of values
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach({ hit ->
hit.assertAttributeEquals("hit.count", hitsCount)
assertOutputContent(hit.getContent(), hitsCount as int, ndjson)
if (ResultOutputStrategy.PER_RESPONSE == resultOutputStrategy) {
OBJECT_MAPPER.readValue(hit.getContent(), ArrayList.class).forEach(h -> {
assertFormattedResult(searchResultsFormat, h as Map<String, Object>)
})
} else {
final Map<String, Object> h = OBJECT_MAPPER.readValue(hit.getContent(), Map.class)
assertFormattedResult(searchResultsFormat, h)
}
assertThat(
runner.getProvenanceEvents().stream().filter({ pe ->
pe.getEventType() == ProvenanceEventType.RECEIVE &&
pe.getAttribute("uuid") == hit.getAttribute("uuid")
}).count(),
is(1L)
)
})
}
@Test
void testResultsFormat() throws Exception {
for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
final TestRunner runner = createRunner(false)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
// Test against each results format
for (final SearchResultsFormat searchResultsFormat : SearchResultsFormat.values()) {
runner.setProperty(AbstractJsonQueryElasticsearch.SEARCH_RESULTS_FORMAT, searchResultsFormat.getValue())
// Test against each pagination type
for (final PaginationType paginationType : PaginationType.values()) {
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
runOnce(runner)
assertResultsFormat(runner, resultOutputStrategy, searchResultsFormat)
reset(runner)
}
}
}
}
@Test
void testScrollError() {
final TestRunner runner = createRunner(false)
@ -270,35 +363,20 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
}
@Test
void testNoHitsFlowFileIsProducedForEachResultSplitSetup() {
void testEmptyHitsFlowFileIsProducedForEachResultSplitSetup() {
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]]])))
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([query: [match_all: [:]], "sort": [[message: [order: "asc"]]]])))
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
service.setMaxPages(0)
for (final PaginationType paginationType : PaginationType.values()) {
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
// test that an empty flow file is produced for a per query setup
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
reset(runner)
// test that an empty flow file is produced for a per hit setup
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "0")
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "1")
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getSize() == 0
reset(runner)
// test that an empty flow file is produced for a per response setup
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_RESPONSE.getValue())
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
runOnce(runner)
testCounts(runner, isInput() ? 1 : 0, 1, 0, 0)
@ -308,3 +386,7 @@ abstract class AbstractPaginatedJsonQueryElasticsearchTest extends AbstractJsonQ
reset(runner)
}
}
}
}

View File

@ -17,7 +17,6 @@
package org.apache.nifi.processors.elasticsearch
import org.apache.nifi.processors.elasticsearch.api.PaginationType
import org.apache.nifi.processors.elasticsearch.api.ResultOutputStrategy
import org.apache.nifi.util.TestRunner
@ -40,15 +39,9 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
return true
}
void testPagination(final PaginationType paginationType) {
// test flowfile per page
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.PAGINATION_TYPE, paginationType.getValue())
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [ msg: "desc"], query: [ match_all: [:] ]])))
runOnce(runner)
static void validatePagination(final TestRunner runner, final ResultOutputStrategy resultOutputStrategy) {
switch (resultOutputStrategy) {
case ResultOutputStrategy.PER_RESPONSE:
testCounts(runner, 1, 2, 0, 0)
int page = 1
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
@ -57,13 +50,18 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
hit.assertAttributeEquals("page.number", Integer.toString(page++))
}
)
runner.getStateManager().assertStateNotSet()
reset(runner)
// test hits splitting
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_HIT.getValue())
runOnce(runner)
break
case ResultOutputStrategy.PER_QUERY:
testCounts(runner, 1, 1, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20")
// the "last" page.number is used, so 2 here because there were 2 pages of hits
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2")
assertThat(
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length,
is(20)
)
break
case ResultOutputStrategy.PER_HIT:
testCounts(runner, 1, 20, 0, 0)
int count = 0
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).forEach(
@ -73,21 +71,33 @@ class PaginatedJsonQueryElasticsearchTest extends AbstractPaginatedJsonQueryElas
hit.assertAttributeEquals("page.number", Integer.toString(Math.ceil(++count / 10) as int))
}
)
break
default:
throw new IllegalArgumentException("Unknown ResultOutputStrategy value: " + resultOutputStrategy)
}
}
void testPagination(final PaginationType paginationType) {
final TestRunner runner = createRunner(false)
final TestElasticsearchClientService service = getService(runner)
service.setMaxPages(2)
runner.setProperty(AbstractJsonQueryElasticsearch.QUERY, prettyPrint(toJson([size: 10, sort: [msg: "desc"], query: [match_all: [:]]])))
for (final ResultOutputStrategy resultOutputStrategy : ResultOutputStrategy.values()) {
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, resultOutputStrategy.getValue())
runOnce(runner)
validatePagination(runner, resultOutputStrategy)
runner.getStateManager().assertStateNotSet()
reset(runner)
// test hits combined
runner.setProperty(AbstractPaginatedJsonQueryElasticsearch.SEARCH_RESULTS_SPLIT, ResultOutputStrategy.PER_QUERY.getValue())
// Check that OUTPUT_NO_HITS true doesn't have any adverse effects on pagination
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "true")
runOnce(runner)
testCounts(runner, 1, 1, 0, 0)
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("hit.count", "20")
// the "last" page.number is used, so 2 here because there were 2 pages of hits
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).assertAttributeEquals("page.number", "2")
assertThat(
runner.getFlowFilesForRelationship(AbstractJsonQueryElasticsearch.REL_HITS).get(0).getContent().split("\n").length,
is(20)
)
runner.getStateManager().assertStateNotSet()
validatePagination(runner, resultOutputStrategy)
// Unset OUTPUT_NO_HITS
runner.setProperty(AbstractJsonQueryElasticsearch.OUTPUT_NO_HITS, "false")
reset(runner)
}
}
}

View File

@ -51,7 +51,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.7.0"));
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.7.1"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)

View File

@ -101,7 +101,7 @@ language governing permissions and limitations under the License. -->
</activation>
<properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
<elasticsearch_docker_image>8.7.0</elasticsearch_docker_image>
<elasticsearch_docker_image>8.7.1</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties>
<build>
@ -132,7 +132,7 @@ language governing permissions and limitations under the License. -->
<profile>
<id>elasticsearch7</id>
<properties>
<elasticsearch_docker_image>7.17.9</elasticsearch_docker_image>
<elasticsearch_docker_image>7.17.10</elasticsearch_docker_image>
</properties>
</profile>
</profiles>