* [ML] properly nesting objects in document source * Throw exception on agg extraction failure, cause it to fail df * throwing error to stop df if unsupported agg is found
This commit is contained in:
parent
9944fdf237
commit
0931815355
|
@ -41,6 +41,7 @@ import org.elasticsearch.xpack.core.scheduler.SchedulerEngine.Event;
|
||||||
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
|
import org.elasticsearch.xpack.dataframe.checkpoint.DataFrameTransformsCheckpointService;
|
||||||
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
|
import org.elasticsearch.xpack.dataframe.notifications.DataFrameAuditor;
|
||||||
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
|
import org.elasticsearch.xpack.dataframe.persistence.DataFrameTransformsConfigManager;
|
||||||
|
import org.elasticsearch.xpack.dataframe.transforms.pivot.AggregationResultUtils;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -636,7 +637,7 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isIrrecoverableFailure(Exception e) {
|
private boolean isIrrecoverableFailure(Exception e) {
|
||||||
return e instanceof IndexNotFoundException;
|
return e instanceof IndexNotFoundException || e instanceof AggregationResultUtils.AggregationExtractionException;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void handleFailure(Exception e) {
|
synchronized void handleFailure(Exception e) {
|
||||||
|
|
|
@ -8,6 +8,7 @@ package org.elasticsearch.xpack.dataframe.transforms.pivot;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
import org.elasticsearch.search.aggregations.Aggregation;
|
import org.elasticsearch.search.aggregations.Aggregation;
|
||||||
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
import org.elasticsearch.search.aggregations.AggregationBuilder;
|
||||||
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
|
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
|
||||||
|
@ -29,7 +30,7 @@ import java.util.stream.Stream;
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType;
|
import static org.elasticsearch.xpack.dataframe.transforms.pivot.SchemaUtil.isNumericType;
|
||||||
|
|
||||||
final class AggregationResultUtils {
|
public final class AggregationResultUtils {
|
||||||
private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class);
|
private static final Logger logger = LogManager.getLogger(AggregationResultUtils.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -77,17 +78,18 @@ final class AggregationResultUtils {
|
||||||
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
|
// gather the `value` type, otherwise utilize `getValueAsString` so we don't lose formatted outputs.
|
||||||
if (isNumericType(fieldType) ||
|
if (isNumericType(fieldType) ||
|
||||||
(aggResultSingleValue.getValueAsString().equals(String.valueOf(aggResultSingleValue.value())))) {
|
(aggResultSingleValue.getValueAsString().equals(String.valueOf(aggResultSingleValue.value())))) {
|
||||||
document.put(aggName, aggResultSingleValue.value());
|
updateDocument(document, aggName, aggResultSingleValue.value());
|
||||||
} else {
|
} else {
|
||||||
document.put(aggName, aggResultSingleValue.getValueAsString());
|
updateDocument(document, aggName, aggResultSingleValue.getValueAsString());
|
||||||
}
|
}
|
||||||
} else if (aggResult instanceof ScriptedMetric) {
|
} else if (aggResult instanceof ScriptedMetric) {
|
||||||
document.put(aggName, ((ScriptedMetric) aggResult).aggregation());
|
updateDocument(document, aggName, ((ScriptedMetric) aggResult).aggregation());
|
||||||
} else {
|
} else {
|
||||||
// Execution should never reach this point!
|
// Execution should never reach this point!
|
||||||
// Creating transforms with unsupported aggregations shall not be possible
|
// Creating transforms with unsupported aggregations shall not be possible
|
||||||
logger.error("Dataframe Internal Error: unsupported aggregation ["+ aggResult.getName() +"], ignoring");
|
throw new AggregationExtractionException("unsupported aggregation [{}] with name [{}]",
|
||||||
assert false;
|
aggResult.getType(),
|
||||||
|
aggResult.getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,4 +99,49 @@ final class AggregationResultUtils {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
static void updateDocument(Map<String, Object> document, String fieldName, Object value) {
|
||||||
|
String[] fieldTokens = fieldName.split("\\.");
|
||||||
|
if (fieldTokens.length == 1) {
|
||||||
|
document.put(fieldName, value);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Map<String, Object> internalMap = document;
|
||||||
|
for (int i = 0; i < fieldTokens.length; i++) {
|
||||||
|
String token = fieldTokens[i];
|
||||||
|
if (i == fieldTokens.length - 1) {
|
||||||
|
if (internalMap.containsKey(token)) {
|
||||||
|
if (internalMap.get(token) instanceof Map) {
|
||||||
|
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
|
||||||
|
fieldName);
|
||||||
|
} else {
|
||||||
|
throw new AggregationExtractionException("duplicate key value pairs key [{}] old value [{}] duplicate value [{}]",
|
||||||
|
fieldName,
|
||||||
|
internalMap.get(token),
|
||||||
|
value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
internalMap.put(token, value);
|
||||||
|
} else {
|
||||||
|
if (internalMap.containsKey(token)) {
|
||||||
|
if (internalMap.get(token) instanceof Map) {
|
||||||
|
internalMap = (Map<String, Object>)internalMap.get(token);
|
||||||
|
} else {
|
||||||
|
throw new AggregationExtractionException("mixed object types of nested and non-nested fields [{}]",
|
||||||
|
fieldName);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Map<String, Object> newMap = new HashMap<>();
|
||||||
|
internalMap.put(token, newMap);
|
||||||
|
internalMap = newMap;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class AggregationExtractionException extends ElasticsearchException {
|
||||||
|
AggregationExtractionException(String msg, Object... args) {
|
||||||
|
super(msg, args);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,7 @@ import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
|
||||||
public class AggregationResultUtilsTests extends ESTestCase {
|
public class AggregationResultUtilsTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -736,6 +737,51 @@ public class AggregationResultUtilsTests extends ESTestCase {
|
||||||
assertEquals(documentIdsFirstRun, documentIdsSecondRun);
|
assertEquals(documentIdsFirstRun, documentIdsSecondRun);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testUpdateDocument() {
|
||||||
|
Map<String, Object> document = new HashMap<>();
|
||||||
|
|
||||||
|
AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L);
|
||||||
|
AggregationResultUtils.updateDocument(document, "foo.bar.baz2", 2000L);
|
||||||
|
AggregationResultUtils.updateDocument(document, "bar.field1", 1L);
|
||||||
|
AggregationResultUtils.updateDocument(document, "metric", 10L);
|
||||||
|
|
||||||
|
assertThat(document.get("metric"), equalTo(10L));
|
||||||
|
|
||||||
|
Map<String, Object> bar = (Map<String, Object>)document.get("bar");
|
||||||
|
|
||||||
|
assertThat(bar.get("field1"), equalTo(1L));
|
||||||
|
|
||||||
|
Map<String, Object> foo = (Map<String, Object>)document.get("foo");
|
||||||
|
Map<String, Object> foobar = (Map<String, Object>)foo.get("bar");
|
||||||
|
|
||||||
|
assertThat(foobar.get("baz"), equalTo(1000L));
|
||||||
|
assertThat(foobar.get("baz2"), equalTo(2000L));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testUpdateDocumentWithDuplicate() {
|
||||||
|
Map<String, Object> document = new HashMap<>();
|
||||||
|
|
||||||
|
AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L);
|
||||||
|
AggregationResultUtils.AggregationExtractionException exception =
|
||||||
|
expectThrows(AggregationResultUtils.AggregationExtractionException.class,
|
||||||
|
() -> AggregationResultUtils.updateDocument(document, "foo.bar.baz", 2000L));
|
||||||
|
assertThat(exception.getMessage(),
|
||||||
|
equalTo("duplicate key value pairs key [foo.bar.baz] old value [1000] duplicate value [2000]"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testUpdateDocumentWithObjectAndNotObject() {
|
||||||
|
Map<String, Object> document = new HashMap<>();
|
||||||
|
|
||||||
|
AggregationResultUtils.updateDocument(document, "foo.bar.baz", 1000L);
|
||||||
|
AggregationResultUtils.AggregationExtractionException exception =
|
||||||
|
expectThrows(AggregationResultUtils.AggregationExtractionException.class,
|
||||||
|
() -> AggregationResultUtils.updateDocument(document, "foo.bar", 2000L));
|
||||||
|
assertThat(exception.getMessage(),
|
||||||
|
equalTo("mixed object types of nested and non-nested fields [foo.bar]"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private void executeTest(GroupConfig groups,
|
private void executeTest(GroupConfig groups,
|
||||||
Collection<AggregationBuilder> aggregationBuilders,
|
Collection<AggregationBuilder> aggregationBuilders,
|
||||||
Collection<PipelineAggregationBuilder> pipelineAggregationBuilders,
|
Collection<PipelineAggregationBuilder> pipelineAggregationBuilders,
|
||||||
|
|
|
@ -76,18 +76,28 @@ setup:
|
||||||
"group_by": {
|
"group_by": {
|
||||||
"airline": {"terms": {"field": "airline"}},
|
"airline": {"terms": {"field": "airline"}},
|
||||||
"by-hour": {"date_histogram": {"interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}},
|
"by-hour": {"date_histogram": {"interval": "1h", "field": "time", "format": "yyyy-MM-DD HH"}}},
|
||||||
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
|
"aggs": {
|
||||||
|
"avg_response": {"avg": {"field": "responsetime"}},
|
||||||
|
"time.max": {"max": {"field": "time"}},
|
||||||
|
"time.min": {"min": {"field": "time"}}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
- match: { preview.0.airline: foo }
|
- match: { preview.0.airline: foo }
|
||||||
- match: { preview.0.by-hour: "2017-02-49 00" }
|
- match: { preview.0.by-hour: "2017-02-49 00" }
|
||||||
- match: { preview.0.avg_response: 1.0 }
|
- match: { preview.0.avg_response: 1.0 }
|
||||||
|
- match: { preview.0.time.max: "2017-02-18T00:30:00.000Z" }
|
||||||
|
- match: { preview.0.time.min: "2017-02-18T00:00:00.000Z" }
|
||||||
- match: { preview.1.airline: bar }
|
- match: { preview.1.airline: bar }
|
||||||
- match: { preview.1.by-hour: "2017-02-49 01" }
|
- match: { preview.1.by-hour: "2017-02-49 01" }
|
||||||
- match: { preview.1.avg_response: 42.0 }
|
- match: { preview.1.avg_response: 42.0 }
|
||||||
|
- match: { preview.1.time.max: "2017-02-18T01:00:00.000Z" }
|
||||||
|
- match: { preview.1.time.min: "2017-02-18T01:00:00.000Z" }
|
||||||
- match: { preview.2.airline: foo }
|
- match: { preview.2.airline: foo }
|
||||||
- match: { preview.2.by-hour: "2017-02-49 01" }
|
- match: { preview.2.by-hour: "2017-02-49 01" }
|
||||||
- match: { preview.2.avg_response: 42.0 }
|
- match: { preview.2.avg_response: 42.0 }
|
||||||
|
- match: { preview.2.time.max: "2017-02-18T01:01:00.000Z" }
|
||||||
|
- match: { preview.2.time.min: "2017-02-18T01:01:00.000Z" }
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test preview transform with invalid config":
|
"Test preview transform with invalid config":
|
||||||
|
|
Loading…
Reference in New Issue