mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
add an integration test using date_nanos as timestamp (#51477)
add a test for using date_nanos as timestamp field in a continuous transform
This commit is contained in:
parent
bebce4b190
commit
61663b495e
@ -6,8 +6,12 @@
|
||||
|
||||
package org.elasticsearch.xpack.transform.integration;
|
||||
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.junit.Before;
|
||||
|
||||
@ -19,6 +23,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
@ -1022,4 +1027,143 @@ public class TransformPivotRestIT extends TransformRestTestCase {
|
||||
assertOnePivotValue(transformIndex + "/_search?q=reviewer:user_26", 3.918918918);
|
||||
deleteIndex(indexName);
|
||||
}
|
||||
|
||||
public void testContinuousDateNanos() throws Exception {
|
||||
String indexName = "nanos";
|
||||
createDateNanoIndex(indexName, 1000);
|
||||
String transformId = "nanos_continuous_pivot";
|
||||
String transformIndex = "pivot_nanos_continuous";
|
||||
setupDataAccessRole(DATA_ACCESS_ROLE, indexName, transformIndex);
|
||||
final Request createTransformRequest = createRequestWithAuth(
|
||||
"PUT",
|
||||
getTransformEndpoint() + transformId,
|
||||
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS
|
||||
);
|
||||
String config = "{"
|
||||
+ " \"source\": {\"index\":\""
|
||||
+ indexName
|
||||
+ "\"},"
|
||||
+ " \"dest\": {\"index\":\""
|
||||
+ transformIndex
|
||||
+ "\"},"
|
||||
+ " \"frequency\": \"1s\","
|
||||
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
|
||||
+ " \"pivot\": {"
|
||||
+ " \"group_by\": {"
|
||||
+ " \"id\": {"
|
||||
+ " \"terms\": {"
|
||||
+ " \"field\": \"id\""
|
||||
+ " } } },"
|
||||
+ " \"aggregations\": {"
|
||||
+ " \"avg_rating\": {"
|
||||
+ " \"avg\": {"
|
||||
+ " \"field\": \"rating\""
|
||||
+ " } } } }"
|
||||
+ "}";
|
||||
createTransformRequest.setJsonEntity(config);
|
||||
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
|
||||
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
||||
|
||||
startAndWaitForContinuousTransform(transformId, transformIndex, null);
|
||||
assertTrue(indexExists(transformIndex));
|
||||
// get and check some ids
|
||||
assertOnePivotValue(transformIndex + "/_search?q=id:id_0", 2.97);
|
||||
assertOnePivotValue(transformIndex + "/_search?q=id:id_1", 2.99);
|
||||
assertOnePivotValue(transformIndex + "/_search?q=id:id_7", 2.97);
|
||||
assertOnePivotValue(transformIndex + "/_search?q=id:id_9", 3.01);
|
||||
|
||||
String nanoResolutionTimeStamp = Instant.now().minusSeconds(1).plusNanos(randomIntBetween(1, 1000000)).toString();
|
||||
|
||||
final StringBuilder bulk = new StringBuilder();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"id\":\"")
|
||||
.append("id_")
|
||||
.append(i % 5)
|
||||
.append("\",\"rating\":")
|
||||
.append(7)
|
||||
.append(",\"timestamp\":")
|
||||
.append("\"" + nanoResolutionTimeStamp + "\"")
|
||||
.append("}\n");
|
||||
}
|
||||
bulk.append("\r\n");
|
||||
|
||||
final Request bulkRequest = new Request("POST", "/_bulk");
|
||||
bulkRequest.addParameter("refresh", "true");
|
||||
bulkRequest.setJsonEntity(bulk.toString());
|
||||
client().performRequest(bulkRequest);
|
||||
|
||||
waitForTransformCheckpoint(transformId, 2);
|
||||
|
||||
stopTransform(transformId, false);
|
||||
refreshIndex(transformIndex);
|
||||
|
||||
// assert changes
|
||||
assertOnePivotValue(transformIndex + "/_search?q=id:id_0", 3.125);
|
||||
assertOnePivotValue(transformIndex + "/_search?q=id:id_1", 3.144230769);
|
||||
|
||||
// assert unchanged
|
||||
assertOnePivotValue(transformIndex + "/_search?q=id:id_7", 2.97);
|
||||
assertOnePivotValue(transformIndex + "/_search?q=id:id_9", 3.01);
|
||||
|
||||
deleteIndex(indexName);
|
||||
}
|
||||
|
||||
private void createDateNanoIndex(String indexName, int numDocs) throws IOException {
|
||||
// create mapping
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.startObject("mappings")
|
||||
.startObject("properties")
|
||||
.startObject("timestamp")
|
||||
.field("type", "date_nanos")
|
||||
.field("format", "strict_date_optional_time_nanos")
|
||||
.endObject()
|
||||
.startObject("id")
|
||||
.field("type", "keyword")
|
||||
.endObject()
|
||||
.startObject("rating")
|
||||
.field("type", "integer")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
|
||||
Request req = new Request("PUT", indexName);
|
||||
req.setEntity(entity);
|
||||
client().performRequest(req);
|
||||
}
|
||||
|
||||
String randomNanos = "," + randomIntBetween(100000000, 999999999);
|
||||
final StringBuilder bulk = new StringBuilder();
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"id\":\"")
|
||||
.append("id_")
|
||||
.append(i % 10)
|
||||
.append("\",\"rating\":")
|
||||
.append(i % 7)
|
||||
.append(",\"timestamp\":")
|
||||
.append("\"2020-01-27T01:59:00" + randomNanos + "Z\"")
|
||||
.append("}\n");
|
||||
|
||||
if (i % 50 == 0) {
|
||||
bulk.append("\r\n");
|
||||
final Request bulkRequest = new Request("POST", "/_bulk");
|
||||
bulkRequest.addParameter("refresh", "true");
|
||||
bulkRequest.setJsonEntity(bulk.toString());
|
||||
client().performRequest(bulkRequest);
|
||||
// clear the builder
|
||||
bulk.setLength(0);
|
||||
}
|
||||
}
|
||||
bulk.append("\r\n");
|
||||
|
||||
final Request bulkRequest = new Request("POST", "/_bulk");
|
||||
bulkRequest.addParameter("refresh", "true");
|
||||
bulkRequest.setJsonEntity(bulk.toString());
|
||||
client().performRequest(bulkRequest);
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user