From 61663b495e4708382cedc155d956791dd2366416 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 28 Jan 2020 10:09:49 +0100 Subject: [PATCH] add an integration test using date_nanos as timestamp (#51477) add a test for using date_nanos as timestamp field in a continuous transform --- .../integration/TransformPivotRestIT.java | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java index ca8251e5043..fb8c1c2c719 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestIT.java @@ -6,8 +6,12 @@ package org.elasticsearch.xpack.transform.integration; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Request; import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.junit.Before; @@ -19,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -1022,4 +1027,143 @@ public class TransformPivotRestIT extends TransformRestTestCase { assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918); deleteIndex(indexName); } + + public void testContinuousDateNanos() throws Exception { + String indexName = "nanos"; + createDateNanoIndex(indexName, 1000); + String transformId = "nanos_continuous_pivot"; + String transformIndex = "pivot_nanos_continuous"; + setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex); + final Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS + ); + String config = "{" + + " \"source\": {\"index\":\"" + + indexName + + "\"}," + + " \"dest\": {\"index\":\"" + + transformIndex + + "\"}," + + " \"frequency\": \"1s\"," + + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + + " \"pivot\": {" + + " \"group_by\": {" + + " \"id\": {" + + " \"terms\": {" + + " \"field\": \"id\"" + + " } } }," + + " \"aggregations\": {" + + " \"avg_rating\": {" + + " \"avg\": {" + + " \"field\": \"rating\"" + + " } } } }" + + "}"; + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + startAndWaitForContinuousTransform(transformId, transformIndex, null); + assertTrue(indexExists(transformIndex)); + // get and check some ids + assertOnePivotValue(transformIndex + "/_search?q=id:id_0", 2.97); + assertOnePivotValue(transformIndex + "/_search?q=id:id_1", 2.99); + assertOnePivotValue(transformIndex + "/_search?q=id:id_7", 2.97); + assertOnePivotValue(transformIndex + "/_search?q=id:id_9", 3.01); + + String nanoResolutionTimeStamp = Instant.now().minusSeconds(1).plusNanos(randomIntBetween(1, 1000000)).toString(); + + final StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < 20; i++) { + bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n"); + bulk.append("{\"id\":\"") + .append("id_") + .append(i % 5) + .append("\",\"rating\":") + .append(7) + .append(",\"timestamp\":") + .append("\"" + nanoResolutionTimeStamp + "\"") + .append("}\n"); + } + bulk.append("\r\n"); + + final Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity(bulk.toString()); + client().performRequest(bulkRequest); + + waitForTransformCheckpoint(transformId, 2); + + stopTransform(transformId, false); + refreshIndex(transformIndex); + + // assert changes + assertOnePivotValue(transformIndex + "/_search?q=id:id_0", 3.125); + assertOnePivotValue(transformIndex + "/_search?q=id:id_1", 3.144230769); + + // assert unchanged + assertOnePivotValue(transformIndex + "/_search?q=id:id_7", 2.97); + assertOnePivotValue(transformIndex + "/_search?q=id:id_9", 3.01); + + deleteIndex(indexName); + } + + private void createDateNanoIndex(String indexName, int numDocs) throws IOException { + // create mapping + try (XContentBuilder builder = jsonBuilder()) { + builder.startObject(); + { + builder.startObject("mappings") + .startObject("properties") + .startObject("timestamp") + .field("type", "date_nanos") + .field("format", "strict_date_optional_time_nanos") + .endObject() + .startObject("id") + .field("type", "keyword") + .endObject() + .startObject("rating") + .field("type", "integer") + .endObject() + .endObject() + .endObject(); + } + builder.endObject(); + final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON); + Request req = new Request("PUT", indexName); + req.setEntity(entity); + client().performRequest(req); + } + + String randomNanos = "," + randomIntBetween(100000000, 999999999); + final StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < numDocs; i++) { + bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n"); + bulk.append("{\"id\":\"") + .append("id_") + .append(i % 10) + .append("\",\"rating\":") + .append(i % 7) + .append(",\"timestamp\":") + .append("\"2020-01-27T01:59:00" + randomNanos + "Z\"") + .append("}\n"); + + if (i % 50 == 0) { + bulk.append("\r\n"); + final Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity(bulk.toString()); + client().performRequest(bulkRequest); + // clear the builder + bulk.setLength(0); + } + } + bulk.append("\r\n"); + + final Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity(bulk.toString()); + client().performRequest(bulkRequest); + } }