NIFI-13719 allow Elasticsearch response to contain Long values for the took field

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #9244.
This commit is contained in:
Chris Sampson 2024-09-07 13:22:21 +01:00 committed by Pierre Villard
parent 657e75c667
commit 2ead001d14
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
7 changed files with 114 additions and 15 deletions

View File

@ -46,7 +46,8 @@ public class IndexOperationResponse implements OperationResponse {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static IndexOperationResponse fromJsonResponse(final String response) throws IOException { public static IndexOperationResponse fromJsonResponse(final String response) throws IOException {
final Map<String, Object> parsedResponse = OBJECT_MAPPER.readValue(response, Map.class); final Map<String, Object> parsedResponse = OBJECT_MAPPER.readValue(response, Map.class);
final int took = (int) parsedResponse.get("took"); // took should be an int, but could be a long (bg in Elasticsearch 8.15.0)
final long took = Long.parseLong(String.valueOf(parsedResponse.get("took")));
final boolean hasErrors = (boolean) parsedResponse.get("errors"); final boolean hasErrors = (boolean) parsedResponse.get("errors");
final List<Map<String, Object>> items = (List<Map<String, Object>>) parsedResponse.get("items"); final List<Map<String, Object>> items = (List<Map<String, Object>>) parsedResponse.get("items");

View File

@ -24,7 +24,7 @@ public class SearchResponse implements OperationResponse {
private final List<Map<String, Object>> hits; private final List<Map<String, Object>> hits;
private final Map<String, Object> aggregations; private final Map<String, Object> aggregations;
private final long numberOfHits; private final long numberOfHits;
private final int took; private final long took;
private final boolean timedOut; private final boolean timedOut;
private final String pitId; private final String pitId;
private final String scrollId; private final String scrollId;
@ -32,7 +32,7 @@ public class SearchResponse implements OperationResponse {
private final List<String> warnings; private final List<String> warnings;
public SearchResponse(final List<Map<String, Object>> hits, final Map<String, Object> aggregations, final String pitId, public SearchResponse(final List<Map<String, Object>> hits, final Map<String, Object> aggregations, final String pitId,
final String scrollId, final String searchAfter, final int numberOfHits, final int took, final boolean timedOut, final String scrollId, final String searchAfter, final int numberOfHits, final long took, final boolean timedOut,
final List<String> warnings) { final List<String> warnings) {
this.hits = hits; this.hits = hits;
this.aggregations = aggregations; this.aggregations = aggregations;

View File

@ -579,7 +579,7 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final List<String> warnings = Arrays.stream(response.getHeaders()) final List<String> warnings = Arrays.stream(response.getHeaders())
.filter(h -> "Warning".equalsIgnoreCase(h.getName())) .filter(h -> "Warning".equalsIgnoreCase(h.getName()))
.map(Header::getValue) .map(Header::getValue)
.collect(Collectors.toList()); .toList();
warnings.forEach(w -> getLogger().warn("Elasticsearch Warning: {}", w)); warnings.forEach(w -> getLogger().warn("Elasticsearch Warning: {}", w));
@ -627,13 +627,11 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final String header = buildBulkHeader(request); final String header = buildBulkHeader(request);
builder.append(header).append("\n"); builder.append(header).append("\n");
switch (request.getOperation()) { switch (request.getOperation()) {
case Index: case Index, Create:
case Create:
final String indexDocument = mapper.writeValueAsString(request.getFields()); final String indexDocument = mapper.writeValueAsString(request.getFields());
builder.append(indexDocument).append("\n"); builder.append(indexDocument).append("\n");
break; break;
case Update: case Update, Upsert:
case Upsert:
final Map<String, Object> updateBody = new HashMap<>(2, 1); final Map<String, Object> updateBody = new HashMap<>(2, 1);
if (request.getScript() != null && !request.getScript().isEmpty()) { if (request.getScript() != null && !request.getScript().isEmpty()) {
updateBody.put("script", request.getScript()); updateBody.put("script", request.getScript());
@ -945,7 +943,8 @@ public class ElasticSearchClientServiceImpl extends AbstractControllerService im
final Map<String, Object> parsed = parseResponse(response); final Map<String, Object> parsed = parseResponse(response);
final List<String> warnings = parseResponseWarningHeaders(response); final List<String> warnings = parseResponseWarningHeaders(response);
final int took = (Integer) parsed.get("took"); // took should be an int, but could be a long (bg in Elasticsearch 8.15.0)
final long took = Long.parseLong(String.valueOf(parsed.get("took")));
final boolean timedOut = (Boolean) parsed.get("timed_out"); final boolean timedOut = (Boolean) parsed.get("timed_out");
final String pitId = parsed.get("pit_id") != null ? (String) parsed.get("pit_id") : null; final String pitId = parsed.get("pit_id") != null ? (String) parsed.get("pit_id") : null;
final String scrollId = parsed.get("_scroll_id") != null ? (String) parsed.get("_scroll_id") : null; final String scrollId = parsed.get("_scroll_id") != null ? (String) parsed.get("_scroll_id") : null;

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.elasticsearch;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
class IndexOperationResponseTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Test
void testTook() {
long took = 100;
final IndexOperationResponse response = new IndexOperationResponse(took);
assertEquals(took, response.getTook());
}
@Test
void testFromJson() throws Exception {
final long took = 100;
final boolean errors = false;
final List<Map<String, Object>> items = Collections.emptyList();
final Map<String, Object> responseMap = Map.of(
"took", Long.valueOf(took).intValue(),
"errors", errors,
"items", items
);
final String responseJson = OBJECT_MAPPER.writeValueAsString(responseMap);
final IndexOperationResponse response = IndexOperationResponse.fromJsonResponse(responseJson);
assertEquals(took, response.getTook());
assertEquals(errors, response.hasErrors());
assertEquals(items, response.getItems());
}
@Test
void testLongTookTime() {
final long took = 34493262031L; // example "took" from Elasticsearch 8.15.0
final IndexOperationResponse response = new IndexOperationResponse(took);
assertEquals(took, response.getTook());
}
@Test
void testFromJsonLongTook() throws Exception {
final long took = 34493262031L; // example "took" from Elasticsearch 8.15.0
final boolean errors = true;
final List<Map<String, Object>> items = Collections.singletonList(Collections.emptyMap());
final Map<String, Object> responseMap = Map.of(
"took", took,
"errors", errors,
"items", items
);
final String responseJson = OBJECT_MAPPER.writeValueAsString(responseMap);
final IndexOperationResponse response = IndexOperationResponse.fromJsonResponse(responseJson);
assertEquals(took, response.getTook());
assertEquals(errors, response.hasErrors());
assertEquals(items, response.getItems());
}
}

View File

@ -28,7 +28,7 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class SearchResponseTest { class SearchResponseTest {
@Test @Test
void test() { void test() {
final List<Map<String, Object>> results = new ArrayList<>(); final List<Map<String, Object>> results = new ArrayList<>();
@ -37,7 +37,7 @@ public class SearchResponseTest {
final String scrollId = "scrollId"; final String scrollId = "scrollId";
final String searchAfter = "searchAfter"; final String searchAfter = "searchAfter";
final int num = 10; final int num = 10;
final int took = 100; final long took = 100;
final boolean timeout = false; final boolean timeout = false;
final List<String> warnings = Collections.singletonList("auth"); final List<String> warnings = Collections.singletonList("auth");
final SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings); final SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
@ -61,4 +61,20 @@ public class SearchResponseTest {
assertTrue(str.contains("timedOut")); assertTrue(str.contains("timedOut"));
assertTrue(str.contains("warnings")); assertTrue(str.contains("warnings"));
} }
@Test
void testLongTookTime() {
final List<Map<String, Object>> results = new ArrayList<>();
final Map<String, Object> aggs = new HashMap<>();
final String pitId = "pitId";
final String scrollId = "scrollId";
final String searchAfter = "searchAfter";
final int num = 10;
final long took = 34493262031L; // example "took" from Elasticsearch 8.15.0
final boolean timeout = false;
final List<String> warnings = Collections.singletonList("auth");
final SearchResponse response = new SearchResponse(results, aggs, pitId, scrollId, searchAfter, num, took, timeout, warnings);
assertEquals(took, response.getTook());
}
} }

View File

@ -52,7 +52,7 @@ import static org.apache.http.auth.AuthScope.ANY;
public abstract class AbstractElasticsearchITBase { public abstract class AbstractElasticsearchITBase {
// default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile // default Elasticsearch version should (ideally) match that in the nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
protected static final DockerImageName IMAGE = DockerImageName protected static final DockerImageName IMAGE = DockerImageName
.parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.13.3")); .parse(System.getProperty("elasticsearch.docker.image", "docker.elastic.co/elasticsearch/elasticsearch:8.15.1"));
protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20)); protected static final String ELASTIC_USER_PASSWORD = System.getProperty("elasticsearch.elastic_user.password", RandomStringUtils.randomAlphanumeric(10, 20));
private static final int PORT = 9200; private static final int PORT = 9200;
protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE) protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = new ElasticsearchContainer(IMAGE)

View File

@ -33,7 +33,7 @@ language governing permissions and limitations under the License. -->
</modules> </modules>
<properties> <properties>
<elasticsearch.client.version>8.15.0</elasticsearch.client.version> <elasticsearch.client.version>8.15.1</elasticsearch.client.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
@ -88,7 +88,7 @@ language governing permissions and limitations under the License. -->
</activation> </activation>
<properties> <properties>
<!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java--> <!-- also update the default Elasticsearch version in nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
<elasticsearch_docker_image>8.15.0</elasticsearch_docker_image> <elasticsearch_docker_image>${elasticsearch.client.version}</elasticsearch_docker_image>
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password> <elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
</properties> </properties>
<build> <build>
@ -119,7 +119,7 @@ language governing permissions and limitations under the License. -->
<profile> <profile>
<id>elasticsearch7</id> <id>elasticsearch7</id>
<properties> <properties>
<elasticsearch_docker_image>7.17.21</elasticsearch_docker_image> <elasticsearch_docker_image>7.17.23</elasticsearch_docker_image>
</properties> </properties>
</profile> </profile>
</profiles> </profiles>