mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 22:36:20 +00:00
[Transform] fix transform failure case for percentiles and spa… (#54202)
index null if percentiles could not be calculated due to sparse data fixes #54201
This commit is contained in:
parent
70b378cd1b
commit
cb0ecafdd8
@ -6,7 +6,11 @@
|
||||
|
||||
package org.elasticsearch.xpack.transform.integration;
|
||||
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.junit.Before;
|
||||
|
||||
@ -14,6 +18,7 @@ import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
|
||||
@ -102,4 +107,110 @@ public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
|
||||
Number actual = (Number) ((List<?>) XContentMapValues.extractValue("hits.hits._source.rating.avg", searchResult)).get(0);
|
||||
assertEquals(3.878048780, actual.doubleValue(), 0.000001);
|
||||
}
|
||||
|
||||
public void testSparseDataPercentiles() throws Exception {
|
||||
String indexName = "cpu-utilization";
|
||||
String transformIndex = "pivot-cpu";
|
||||
String transformId = "pivot-cpu";
|
||||
|
||||
try (XContentBuilder builder = jsonBuilder()) {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.startObject("mappings")
|
||||
.startObject("properties")
|
||||
.startObject("host")
|
||||
.field("type", "keyword")
|
||||
.endObject()
|
||||
.startObject("cpu")
|
||||
.field("type", "integer")
|
||||
.endObject()
|
||||
.endObject()
|
||||
.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
final StringEntity entity = new StringEntity(Strings.toString(builder), ContentType.APPLICATION_JSON);
|
||||
Request req = new Request("PUT", indexName);
|
||||
req.setEntity(entity);
|
||||
client().performRequest(req);
|
||||
}
|
||||
|
||||
final StringBuilder bulk = new StringBuilder();
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"host\":\"host-1\",\"cpu\": 22}\n");
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"host\":\"host-1\",\"cpu\": 55}\n");
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"host\":\"host-1\",\"cpu\": 23}\n");
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"host\":\"host-2\",\"cpu\": 0}\n");
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"host\":\"host-2\",\"cpu\": 99}\n");
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"host\":\"host-1\",\"cpu\": 28}\n");
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"host\":\"host-1\",\"cpu\": 77}\n");
|
||||
|
||||
// missing value for cpu
|
||||
bulk.append("{\"index\":{\"_index\":\"" + indexName + "\"}}\n");
|
||||
bulk.append("{\"host\":\"host-3\"}\n");
|
||||
bulk.append("\r\n");
|
||||
final Request bulkRequest = new Request("POST", "/_bulk");
|
||||
bulkRequest.addParameter("refresh", "true");
|
||||
bulkRequest.setJsonEntity(bulk.toString());
|
||||
client().performRequest(bulkRequest);
|
||||
|
||||
final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);
|
||||
|
||||
String config = "{" + " \"source\": {\"index\":\"" + indexName + "\"}," + " \"dest\": {\"index\":\"" + transformIndex + "\"},";
|
||||
|
||||
config += " \"pivot\": {"
|
||||
+ " \"group_by\": {"
|
||||
+ " \"host\": {"
|
||||
+ " \"terms\": {"
|
||||
+ " \"field\": \"host\""
|
||||
+ " } } },"
|
||||
+ " \"aggregations\": {"
|
||||
+ " \"p\": {"
|
||||
+ " \"percentiles\": {"
|
||||
+ " \"field\": \"cpu\""
|
||||
+ " } }"
|
||||
+ " } }"
|
||||
+ "}";
|
||||
|
||||
createTransformRequest.setJsonEntity(config);
|
||||
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
|
||||
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
||||
|
||||
startAndWaitForTransform(transformId, transformIndex);
|
||||
assertTrue(indexExists(transformIndex));
|
||||
|
||||
Map<String, Object> indexStats = getAsMap(transformIndex + "/_stats");
|
||||
assertEquals(3, XContentMapValues.extractValue("_all.total.docs.count", indexStats));
|
||||
|
||||
// get and check some data
|
||||
Map<String, Object> searchResult = getAsMap(transformIndex + "/_search?q=host:host-1");
|
||||
|
||||
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> percentiles = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
|
||||
"hits.hits._source.p",
|
||||
searchResult
|
||||
)).get(0);
|
||||
|
||||
assertEquals(28.0, (double) percentiles.get("50"), 0.000001);
|
||||
assertEquals(77.0, (double) percentiles.get("99"), 0.000001);
|
||||
|
||||
searchResult = getAsMap(transformIndex + "/_search?q=host:host-3");
|
||||
assertEquals(1, XContentMapValues.extractValue("hits.total.value", searchResult));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> percentilesEmpty = (Map<String, Object>) ((List<?>) XContentMapValues.extractValue(
|
||||
"hits.hits._source.p",
|
||||
searchResult
|
||||
)).get(0);
|
||||
assertTrue(percentilesEmpty.containsKey("50"));
|
||||
assertNull(percentilesEmpty.get("50"));
|
||||
assertTrue(percentilesEmpty.containsKey("99"));
|
||||
assertNull(percentilesEmpty.get("99"));
|
||||
}
|
||||
}
|
||||
|
@ -205,11 +205,16 @@ public final class AggregationResultUtils {
|
||||
@Override
|
||||
public Object value(Aggregation agg, Map<String, String> fieldTypeMap, String lookupFieldPrefix) {
|
||||
Percentiles aggregation = (Percentiles) agg;
|
||||
|
||||
HashMap<String, Double> percentiles = new HashMap<>();
|
||||
|
||||
for (Percentile p : aggregation) {
|
||||
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
|
||||
// in case of sparse data percentiles might not have data, in this case it returns NaN,
|
||||
// we need to guard the output and set null in this case
|
||||
if (Numbers.isValidDouble(p.getValue()) == false) {
|
||||
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), null);
|
||||
} else {
|
||||
percentiles.put(OutputFieldNameConverter.fromDouble(p.getPercent()), p.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
return percentiles;
|
||||
|
@ -798,6 +798,11 @@ public class AggregationResultUtilsTests extends ESTestCase {
|
||||
);
|
||||
}
|
||||
|
||||
public void testPercentilesAggExtractorNaN() {
|
||||
Aggregation agg = createPercentilesAgg("p_agg", Arrays.asList(new Percentile(1, Double.NaN), new Percentile(50, Double.NaN)));
|
||||
assertThat(AggregationResultUtils.getExtractor(agg).value(agg, Collections.emptyMap(), ""), equalTo(asMap("1", null, "50", null)));
|
||||
}
|
||||
|
||||
public static SingleBucketAggregation createSingleBucketAgg(String name, long docCount, Aggregation... subAggregations) {
|
||||
SingleBucketAggregation agg = mock(SingleBucketAggregation.class);
|
||||
when(agg.getDocCount()).thenReturn(docCount);
|
||||
|
Loading…
x
Reference in New Issue
Block a user