[Rollup] Validate field mapping before creating Rollup Job (elastic/x-pack-elasticsearch#4274)

This PR adds logic to ensure that the fields (and field types) configured
in the Rollup Job are present in the index/indices specified by the job's
index pattern.  If a field is missing, or is not aggregatable, it
will throw an exception before the job is created.

This is important for user-friendliness, because otherwise the user
only discovers an issue with mapping when the job is started and
fails to rollup correctly (and only really noticeable by looking at logs,
since it's a runtime failure).

Original commit: elastic/x-pack-elasticsearch@686cd03072
This commit is contained in:
Zachary Tong 2018-04-04 15:32:26 -07:00 committed by GitHub
parent 3ae8c71f16
commit 3852b41330
27 changed files with 791 additions and 11 deletions

View File

@ -569,9 +569,37 @@ setups['sensor_index'] = '''
type: float
node:
type: keyword
load:
type: double
net_in:
type: long
net_out:
type: long
hostname:
type: keyword
datacenter:
type: keyword
'''
setups['sensor_prefab_data'] = '''
- do:
indices.create:
index: sensor-1
body:
settings:
number_of_shards: 1
number_of_replicas: 0
mappings:
data:
properties:
timestamp:
type: date
temperature:
type: long
voltage:
type: float
node:
type: keyword
- do:
indices.create:
index: sensor_rollup

View File

@ -83,6 +83,7 @@ PUT _xpack/rollup/job/sensor
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sensor_index]
When the job is created, you receive the following results:

View File

@ -45,6 +45,7 @@ PUT _xpack/rollup/job/sensor
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sensor_index]
==== Logistical Details

View File

@ -77,7 +77,7 @@ PUT _xpack/rollup/job/sensor
}
--------------------------------------------------
// CONSOLE
// NOTEST
// TEST[setup:sensor_index]
This rolls up the `sensor-*` pattern and stores the results in `sensor_rollup`. To search this rolled up data, we
need to use the `_rollup_search` endpoint. However, you'll notice that we can use regular query DSL to search the

View File

@ -55,6 +55,7 @@ PUT _xpack/rollup/job/sensor
}
--------------------------------------------------
// CONSOLE
// TEST[setup:sensor_index]
We give the job the ID of "sensor" (in the url: `PUT _xpack/rollup/job/sensor`), and tell it to rollup the index pattern `"sensor-*"`.
This job will find and rollup any index that matches that pattern. Rollup summaries are then stored in the `"sensor_rollup"` index.

View File

@ -21,6 +21,7 @@ public class RollupField {
public static final String FILTER = "filter";
public static final String NAME = "rollup";
public static final String AGG = "agg";
public static final String ROLLUP_MISSING = "ROLLUP_MISSING_40710B25931745D4B0B8B310F6912A69";
/**
* Format to the appropriate Rollup field name convention

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.rollup.action;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@ -21,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
public class PutRollupJobAction extends Action<PutRollupJobAction.Request, PutRollupJobAction.Response,
@ -86,6 +88,19 @@ public class PutRollupJobAction extends Action<PutRollupJobAction.Request, PutRo
return null;
}
public ActionRequestValidationException validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse) {
ActionRequestValidationException validationException = new ActionRequestValidationException();
if (fieldCapsResponse.size() == 0) {
validationException.addValidationError("Could not find any fields in the index/index-pattern that were configured in job");
return validationException;
}
config.validateMappings(fieldCapsResponse, validationException);
if (validationException.validationErrors().size() > 0) {
return validationException;
}
return null;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return this.config.toXContent(builder, params);

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -180,6 +182,28 @@ public class DateHistoGroupConfig implements Writeable, ToXContentFragment {
return Collections.singletonMap(RollupField.formatMetaField(RollupField.INTERVAL), interval.toString());
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) {
Map<String, FieldCapabilities> fieldCaps = fieldCapsResponse.get(field);
if (fieldCaps != null && fieldCaps.isEmpty() == false) {
if (fieldCaps.containsKey("date") && fieldCaps.size() == 1) {
if (fieldCaps.get("date").isAggregatable()) {
return;
} else {
validationException.addValidationError("The field [" + field + "] must be aggregatable across all indices, " +
"but is not.");
}
} else {
validationException.addValidationError("The field referenced by a date_histo group must be a [date] type across all " +
"indices in the index pattern. Found: " + fieldCaps.keySet().toString() + " for field [" + field + "]");
}
}
validationException.addValidationError("Could not find a [date] field with name [" + field + "] in any of the indices matching " +
"the index pattern.");
}
@Override
public boolean equals(Object other) {
if (this == other) {

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
@ -16,7 +18,10 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* The configuration object for the groups section in the rollup config.
@ -72,6 +77,29 @@ public class GroupConfig implements Writeable, ToXContentObject {
return terms;
}
public Set<String> getAllFields() {
Set<String> fields = new HashSet<>();
fields.add(dateHisto.getField());
if (histo != null) {
fields.addAll(histo.getAllFields());
}
if (terms != null) {
fields.addAll(terms.getAllFields());
}
return fields;
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) {
dateHisto.validateMappings(fieldCapsResponse, validationException);
if (histo != null) {
histo.validateMappings(fieldCapsResponse, validationException);
}
if (terms != null) {
terms.validateMappings(fieldCapsResponse, validationException);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -13,6 +15,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.HistogramValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
@ -25,7 +28,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* The configuration object for the histograms in the rollup config
@ -46,6 +51,10 @@ public class HistoGroupConfig implements Writeable, ToXContentFragment {
private static final ParseField INTERVAL = new ParseField("interval");
private static final ParseField FIELDS = new ParseField("fields");
private static final List<String> MAPPER_TYPES = Stream.of(NumberFieldMapper.NumberType.values())
.map(NumberFieldMapper.NumberType::typeName)
.collect(Collectors.toList());
private final long interval;
private final String[] fields;
@ -105,6 +114,34 @@ public class HistoGroupConfig implements Writeable, ToXContentFragment {
return Collections.singletonMap(RollupField.formatMetaField(RollupField.INTERVAL), interval);
}
public Set<String> getAllFields() {
return Arrays.stream(fields).collect(Collectors.toSet());
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) {
Arrays.stream(fields).forEach(field -> {
Map<String, FieldCapabilities> fieldCaps = fieldCapsResponse.get(field);
if (fieldCaps != null && fieldCaps.isEmpty() == false) {
fieldCaps.forEach((key, value) -> {
if (MAPPER_TYPES.contains(key)) {
if (value.isAggregatable() == false) {
validationException.addValidationError("The field [" + field + "] must be aggregatable across all indices, " +
"but is not.");
}
} else {
validationException.addValidationError("The field referenced by a histo group must be a [numeric] type, " +
"but found " + fieldCaps.keySet().toString() + " for field [" + field + "]");
}
});
} else {
validationException.addValidationError("Could not find a [numeric] field with name [" + field
+ "] in any of the indices matching the index pattern.");
}
});
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(INTERVAL.getPreferredName(), interval);

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -13,6 +15,7 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
@ -30,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* The configuration object for the metrics portion of a rollup job config
@ -63,6 +67,9 @@ public class MetricConfig implements Writeable, ToXContentFragment {
private static final ParseField AVG = new ParseField("avg");
private static List<String> METRIC_WHITELIST = Arrays.asList("min", "max", "sum", "avg");
private static final List<String> MAPPER_TYPES = Stream.of(NumberFieldMapper.NumberType.values())
.map(NumberFieldMapper.NumberType::typeName)
.collect(Collectors.toList());
public static final ConstructingObjectParser<MetricConfig, Void> PARSER = new ConstructingObjectParser<>(
NAME, a -> new MetricConfig((String)a[0], (List<String>) a[1]));
@ -132,6 +139,28 @@ public class MetricConfig implements Writeable, ToXContentFragment {
return metrics.stream().map(metric -> Collections.singletonMap("agg", (Object)metric)).collect(Collectors.toList());
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) {
Map<String, FieldCapabilities> fieldCaps = fieldCapsResponse.get(field);
if (fieldCaps != null && fieldCaps.isEmpty() == false) {
fieldCaps.forEach((key, value) -> {
if (MAPPER_TYPES.contains(key)) {
if (value.isAggregatable() == false) {
validationException.addValidationError("The field [" + field + "] must be aggregatable across all indices, " +
"but is not.");
}
} else {
validationException.addValidationError("The field referenced by a metric group must be a [numeric] type, but found " +
fieldCaps.keySet().toString() + " for field [" + field + "]");
}
});
} else {
validationException.addValidationError("Could not find a [numeric] field with name [" + field + "] in any of the " +
"indices matching the index pattern.");
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(FIELD.getPreferredName(), field);

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteable;
@ -22,8 +24,12 @@ import org.elasticsearch.xpack.core.scheduler.Cron;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class holds the configuration details of a rollup job, such as the groupings, metrics, what
@ -127,6 +133,20 @@ public class RollupJobConfig implements NamedWriteable, ToXContentObject {
return NAME;
}
public Set<String> getAllFields() {
Set<String> fields = new HashSet<>(groupConfig.getAllFields());
fields.addAll(metricsConfig.stream().map(MetricConfig::getField).collect(Collectors.toSet()));
return fields;
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) {
groupConfig.validateMappings(fieldCapsResponse, validationException);
for (MetricConfig m : metricsConfig) {
m.validateMappings(fieldCapsResponse, validationException);
}
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();

View File

@ -5,6 +5,8 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -13,6 +15,8 @@ import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
@ -24,6 +28,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -42,7 +47,8 @@ public class TermsGroupConfig implements Writeable, ToXContentFragment {
public static final ObjectParser<TermsGroupConfig.Builder, Void> PARSER = new ObjectParser<>(NAME, TermsGroupConfig.Builder::new);
private static final ParseField FIELDS = new ParseField("fields");
private static final List<String> FLOAT_TYPES = Arrays.asList("half_float", "float", "double", "scaled_float");
private static final List<String> NATURAL_TYPES = Arrays.asList("byte", "short", "integer", "long");
private final String[] fields;
static {
@ -91,6 +97,44 @@ public class TermsGroupConfig implements Writeable, ToXContentFragment {
return Collections.emptyMap();
}
public Set<String> getAllFields() {
return Arrays.stream(fields).collect(Collectors.toSet());
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) {
Arrays.stream(fields).forEach(field -> {
Map<String, FieldCapabilities> fieldCaps = fieldCapsResponse.get(field);
if (fieldCaps != null && fieldCaps.isEmpty() == false) {
fieldCaps.forEach((key, value) -> {
if (key.equals(KeywordFieldMapper.CONTENT_TYPE) || key.equals(TextFieldMapper.CONTENT_TYPE)) {
if (value.isAggregatable() == false) {
validationException.addValidationError("The field [" + field + "] must be aggregatable across all indices, " +
"but is not.");
}
} else if (FLOAT_TYPES.contains(key)) {
if (value.isAggregatable() == false) {
validationException.addValidationError("The field [" + field + "] must be aggregatable across all indices, " +
"but is not.");
}
} else if (NATURAL_TYPES.contains(key)) {
if (value.isAggregatable() == false) {
validationException.addValidationError("The field [" + field + "] must be aggregatable across all indices, " +
"but is not.");
}
} else {
validationException.addValidationError("The field referenced by a terms group must be a [numeric] or " +
"[keyword/text] type, but found " + fieldCaps.keySet().toString() + " for field [" + field + "]");
}
});
} else {
validationException.addValidationError("Could not find a [numeric] or [keyword/text] field with name [" + field
+ "] in any of the indices matching the index pattern.");
}
});
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(FIELDS.getPreferredName(), fields);

View File

@ -5,12 +5,22 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DateHistoGroupConfigSerializingTests extends AbstractSerializingTestCase<DateHistoGroupConfig> {
@Override
@ -27,4 +37,109 @@ public class DateHistoGroupConfigSerializingTests extends AbstractSerializingTes
protected DateHistoGroupConfig createTestInstance() {
return ConfigTestHelpers.getDateHisto().build();
}
public void testValidateNoMapping() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
DateHistoGroupConfig config = new DateHistoGroupConfig.Builder()
.setField("my_field")
.setInterval(new DateHistogramInterval("1d"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("Could not find a [date] field with name [my_field] in any of the " +
"indices matching the index pattern."));
}
public void testValidateNomatchingField() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
responseMap.put("some_other_field", Collections.singletonMap("date", fieldCaps));
DateHistoGroupConfig config = new DateHistoGroupConfig.Builder()
.setField("my_field")
.setInterval(new DateHistogramInterval("1d"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("Could not find a [date] field with name [my_field] in any of the " +
"indices matching the index pattern."));
}
public void testValidateFieldWrongType() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
responseMap.put("my_field", Collections.singletonMap("keyword", fieldCaps));
DateHistoGroupConfig config = new DateHistoGroupConfig.Builder()
.setField("my_field")
.setInterval(new DateHistogramInterval("1d"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field referenced by a date_histo group must be a [date] type across all " +
"indices in the index pattern. Found: [keyword] for field [my_field]"));
}
public void testValidateFieldMixtureTypes() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
Map<String, FieldCapabilities> types = new HashMap<>(2);
types.put("date", fieldCaps);
types.put("keyword", fieldCaps);
responseMap.put("my_field", types);
DateHistoGroupConfig config = new DateHistoGroupConfig.Builder()
.setField("my_field")
.setInterval(new DateHistogramInterval("1d"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field referenced by a date_histo group must be a [date] type across all " +
"indices in the index pattern. Found: [date, keyword] for field [my_field]"));
}
public void testValidateFieldMatchingNotAggregatable() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(false);
responseMap.put("my_field", Collections.singletonMap("date", fieldCaps));
DateHistoGroupConfig config = new DateHistoGroupConfig.Builder()
.setField("my_field")
.setInterval(new DateHistogramInterval("1d"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field [my_field] must be aggregatable across all indices, but is not."));
}
public void testValidateMatchingField() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(true);
responseMap.put("my_field", Collections.singletonMap("date", fieldCaps));
DateHistoGroupConfig config = new DateHistoGroupConfig.Builder()
.setField("my_field")
.setInterval(new DateHistogramInterval("1d"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().size(), equalTo(0));
}
}

View File

@ -5,12 +5,21 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class HistoGroupConfigSerializingTests extends AbstractSerializingTestCase<HistoGroupConfig> {
@Override
@ -27,4 +36,88 @@ public class HistoGroupConfigSerializingTests extends AbstractSerializingTestCas
protected HistoGroupConfig createTestInstance() {
return ConfigTestHelpers.getHisto().build();
}
public void testValidateNoMapping() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
HistoGroupConfig config = new HistoGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.setInterval(123)
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("Could not find a [numeric] field with name [my_field] in any of the " +
"indices matching the index pattern."));
}
public void testValidateNomatchingField() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
responseMap.put("some_other_field", Collections.singletonMap("long", fieldCaps));
HistoGroupConfig config = new HistoGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.setInterval(123)
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("Could not find a [numeric] field with name [my_field] in any of the " +
"indices matching the index pattern."));
}
public void testValidateFieldWrongType() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
responseMap.put("my_field", Collections.singletonMap("keyword", fieldCaps));
HistoGroupConfig config = new HistoGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.setInterval(123)
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field referenced by a histo group must be a [numeric] type, but " +
"found [keyword] for field [my_field]"));
}
public void testValidateFieldMatchingNotAggregatable() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(false);
responseMap.put("my_field", Collections.singletonMap("long", fieldCaps));
HistoGroupConfig config = new HistoGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.setInterval(123)
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field [my_field] must be aggregatable across all indices, but is not."));
}
public void testValidateMatchingField() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(true);
responseMap.put("my_field", Collections.singletonMap("long", fieldCaps));
HistoGroupConfig config = new HistoGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.setInterval(123)
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().size(), equalTo(0));
}
}

View File

@ -5,12 +5,21 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MetricsConfigSerializingTests extends AbstractSerializingTestCase<MetricConfig> {
@Override
@ -27,4 +36,89 @@ public class MetricsConfigSerializingTests extends AbstractSerializingTestCase<M
protected MetricConfig createTestInstance() {
return ConfigTestHelpers.getMetricConfig().build();
}
public void testValidateNoMapping() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
MetricConfig config = new MetricConfig.Builder()
.setField("my_field")
.setMetrics(Collections.singletonList("max"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("Could not find a [numeric] field with name [my_field] in any of the " +
"indices matching the index pattern."));
}
public void testValidateNomatchingField() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
responseMap.put("some_other_field", Collections.singletonMap("date", fieldCaps));
MetricConfig config = new MetricConfig.Builder()
.setField("my_field")
.setMetrics(Collections.singletonList("max"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("Could not find a [numeric] field with name [my_field] in any of the " +
"indices matching the index pattern."));
}
public void testValidateFieldWrongType() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
responseMap.put("my_field", Collections.singletonMap("keyword", fieldCaps));
MetricConfig config = new MetricConfig.Builder()
.setField("my_field")
.setMetrics(Collections.singletonList("max"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field referenced by a metric group must be a [numeric] type, " +
"but found [keyword] for field [my_field]"));
}
public void testValidateFieldMatchingNotAggregatable() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(false);
responseMap.put("my_field", Collections.singletonMap("long", fieldCaps));
MetricConfig config = new MetricConfig.Builder()
.setField("my_field")
.setMetrics(Collections.singletonList("max"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field [my_field] must be aggregatable across all indices, but is not."));
}
public void testValidateMatchingField() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(true);
responseMap.put("my_field", Collections.singletonMap("long", fieldCaps));
MetricConfig config = new MetricConfig.Builder()
.setField("my_field")
.setMetrics(Collections.singletonList("max"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().size(), equalTo(0));
}
}

View File

@ -5,14 +5,32 @@
*/
package org.elasticsearch.xpack.core.rollup.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.rollup.ConfigTestHelpers;
import org.elasticsearch.xpack.core.rollup.RollupField;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TermsGroupConfigSerializingTests extends AbstractSerializingTestCase<TermsGroupConfig> {
private static final List<String> FLOAT_TYPES = Arrays.asList("half_float", "float", "double", "scaled_float");
private static final List<String> NATURAL_TYPES = Arrays.asList("byte", "short", "integer", "long");
@Override
protected TermsGroupConfig doParseInstance(XContentParser parser) throws IOException {
return TermsGroupConfig.PARSER.apply(parser, null).build();
@ -27,4 +45,114 @@ public class TermsGroupConfigSerializingTests extends AbstractSerializingTestCas
protected TermsGroupConfig createTestInstance() {
return ConfigTestHelpers.getTerms().build();
}
public void testValidateNoMapping() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
TermsGroupConfig config = new TermsGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("Could not find a [numeric] or [keyword/text] field with name " +
"[my_field] in any of the indices matching the index pattern."));
}
public void testValidateNomatchingField() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
responseMap.put("some_other_field", Collections.singletonMap("keyword", fieldCaps));
TermsGroupConfig config = new TermsGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("Could not find a [numeric] or [keyword/text] field with name " +
"[my_field] in any of the indices matching the index pattern."));
}
public void testValidateFieldWrongType() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
responseMap.put("my_field", Collections.singletonMap("geo_point", fieldCaps));
TermsGroupConfig config = new TermsGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field referenced by a terms group must be a [numeric] or " +
"[keyword/text] type, but found [geo_point] for field [my_field]"));
}
public void testValidateFieldMatchingNotAggregatable() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(false);
responseMap.put("my_field", Collections.singletonMap(getRandomType(), fieldCaps));
TermsGroupConfig config = new TermsGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.build();
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field [my_field] must be aggregatable across all indices, but is not."));
}
public void testValidateMatchingField() throws IOException {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
String type = getRandomType();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(true);
responseMap.put("my_field", Collections.singletonMap(type, fieldCaps));
TermsGroupConfig config = new TermsGroupConfig.Builder()
.setFields(Collections.singletonList("my_field"))
.build();
config.validateMappings(responseMap, e);
if (e.validationErrors().size() != 0) {
fail(e.getMessage());
}
List<CompositeValuesSourceBuilder<?>> builders = config.toBuilders();
assertThat(builders.size(), equalTo(1));
}
private String getRandomType() {
int n = randomIntBetween(0,8);
if (n == 0) {
return "keyword";
} else if (n == 1) {
return "text";
} else if (n == 2) {
return "long";
} else if (n == 3) {
return "integer";
} else if (n == 4) {
return "short";
} else if (n == 5) {
return "float";
} else if (n == 6) {
return "double";
} else if (n == 7) {
return "scaled_float";
} else if (n == 8) {
return "half_float";
}
return "long";
}
}

View File

@ -10,6 +10,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction;
@ -17,6 +18,8 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
@ -33,9 +36,9 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackField;
@ -53,6 +56,7 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
private final XPackLicenseState licenseState;
private final PersistentTasksService persistentTasksService;
private final Client client;
private final IndexNameExpressionResolver indexNameExpressionResolver;
@Inject
@ -65,6 +69,7 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
this.licenseState = licenseState;
this.persistentTasksService = persistentTasksService;
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
}
@Override
@ -85,11 +90,32 @@ public class TransportPutRollupJobAction extends TransportMasterNodeAction<PutRo
listener.onFailure(LicenseUtils.newComplianceException(XPackField.ROLLUP));
return;
}
RollupJob job = createRollupJob(request.getConfig(), threadPool);
createIndex(job, listener, persistentTasksService, client, logger);
FieldCapabilitiesRequest fieldCapsRequest = new FieldCapabilitiesRequest()
.indices(request.getConfig().getIndexPattern())
.fields(request.getConfig().getAllFields().toArray(new String[0]));
client.fieldCaps(fieldCapsRequest, new ActionListener<FieldCapabilitiesResponse>() {
@Override
public void onResponse(FieldCapabilitiesResponse fieldCapabilitiesResponse) {
ActionRequestValidationException validationException = request.validateMappings(fieldCapabilitiesResponse.get());
if (validationException != null) {
listener.onFailure(validationException);
return;
}
RollupJob job = createRollupJob(request.getConfig(), threadPool);
createIndex(job, listener, persistentTasksService, client, logger);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
static RollupJob createRollupJob(RollupJobConfig config, ThreadPool threadPool) {
private static RollupJob createRollupJob(RollupJobConfig config, ThreadPool threadPool) {
// ensure we only filter for the allowed headers
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
.filter(e -> Rollup.HEADER_FILTERS.contains(e.getKey()))

View File

@ -397,7 +397,9 @@ public abstract class RollupIndexer {
CompositeAggregationBuilder composite = new CompositeAggregationBuilder(AGGREGATION_NAME, builders);
config.getMetricsConfig().forEach(m -> m.toBuilders().forEach(composite::subAggregation));
composite.setMetaData(metadata);
if (metadata.isEmpty() == false) {
composite.setMetaData(metadata);
}
composite.size(config.getSize());
return composite;
}

View File

@ -234,7 +234,7 @@ public class RollupIT extends ESIntegTestCase {
.build();
DateHistoGroupConfig.Builder datehistoGroupConfig = new DateHistoGroupConfig.Builder();
datehistoGroupConfig.setField("timestamp");
datehistoGroupConfig.setField("date_histo");
datehistoGroupConfig.setInterval(new DateHistogramInterval("1d"));
GroupConfig.Builder groupConfig = new GroupConfig.Builder();
@ -374,7 +374,7 @@ public class RollupIT extends ESIntegTestCase {
client().admin().indices().prepareRefresh("test-verify").get();
MetricConfig metricConfig = new MetricConfig.Builder()
.setField("foo")
.setField("thefield")
.setMetrics(Arrays.asList("sum", "min", "max", "avg"))
.build();

View File

@ -1,4 +1,16 @@
setup:
- do:
indices.create:
index: foo
body:
mappings:
doc:
properties:
the_field:
type: date
value_field:
type: integer
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser

View File

@ -1,3 +1,16 @@
setup:
- do:
indices.create:
index: foo
body:
mappings:
doc:
properties:
the_field:
type: date
value_field:
type: integer
---
"Test basic get_jobs":

View File

@ -1,4 +1,27 @@
setup:
- do:
indices.create:
index: foo
body:
mappings:
doc:
properties:
the_field:
type: date
value_field:
type: integer
- do:
indices.create:
index: foo2
body:
mappings:
doc:
properties:
the_field:
type: date
value_field:
type: integer
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser

View File

@ -1,3 +1,16 @@
setup:
- do:
indices.create:
index: foo
body:
mappings:
doc:
properties:
the_field:
type: date
value_field:
type: integer
---
"Test basic put_job":

View File

@ -1,4 +1,18 @@
setup:
- do:
indices.create:
index: foo
body:
mappings:
doc:
properties:
timestamp:
type: date
partition:
type: keyword
price:
type: integer
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser

View File

@ -1,4 +1,16 @@
setup:
- do:
indices.create:
index: foo
body:
mappings:
doc:
properties:
the_field:
type: date
value_field:
type: integer
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser

View File

@ -1,9 +1,15 @@
setup:
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
indices.create:
index: foo
body:
mappings:
doc:
properties:
the_field:
type: date
value_field:
type: integer
- do:
headers: