[ML] Make datafeeds work with nanosecond time fields (#51180)
Allows ML datafeeds to work with time fields that have the "date_nanos" type _and make use of the extra precision_. (Previously datafeeds only worked with time fields that were exact multiples of milliseconds. So datafeeds would work with "date_nanos" only if the extra precision over "date" was not used.) Relates #49889
This commit is contained in:
parent
0513d8dca3
commit
0fa7db9a95
|
@ -17,6 +17,7 @@ import org.elasticsearch.xpack.ml.MachineLearning;
|
||||||
import org.yaml.snakeyaml.util.UriEncoder;
|
import org.yaml.snakeyaml.util.UriEncoder;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -36,7 +37,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
|
||||||
assertTrue((Boolean) ml.get("enabled"));
|
assertTrue((Boolean) ml.get("enabled"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInvalidJob() throws Exception {
|
public void testInvalidJob() {
|
||||||
// The job name is invalid because it contains a space
|
// The job name is invalid because it contains a space
|
||||||
String jobId = "invalid job";
|
String jobId = "invalid job";
|
||||||
ResponseException e = expectThrows(ResponseException.class, () -> createFarequoteJob(jobId));
|
ResponseException e = expectThrows(ResponseException.class, () -> createFarequoteJob(jobId));
|
||||||
|
@ -103,11 +104,15 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMiniFarequoteWithDatafeeder() throws Exception {
|
public void testMiniFarequoteWithDatafeeder() throws Exception {
|
||||||
|
boolean datesHaveNanoSecondResolution = randomBoolean();
|
||||||
|
String dateMappingType = datesHaveNanoSecondResolution ? "date_nanos" : "date";
|
||||||
|
String dateFormat = datesHaveNanoSecondResolution ? "strict_date_optional_time_nanos" : "strict_date_optional_time";
|
||||||
|
String randomNanos = datesHaveNanoSecondResolution ? "," + randomIntBetween(100000000, 999999999) : "";
|
||||||
Request createAirlineDataRequest = new Request("PUT", "/airline-data");
|
Request createAirlineDataRequest = new Request("PUT", "/airline-data");
|
||||||
createAirlineDataRequest.setJsonEntity("{"
|
createAirlineDataRequest.setJsonEntity("{"
|
||||||
+ " \"mappings\": {"
|
+ " \"mappings\": {"
|
||||||
+ " \"properties\": {"
|
+ " \"properties\": {"
|
||||||
+ " \"time\": { \"type\":\"date\"},"
|
+ " \"time\": { \"type\":\"" + dateMappingType + "\", \"format\":\"" + dateFormat + "\"},"
|
||||||
+ " \"airline\": { \"type\":\"keyword\"},"
|
+ " \"airline\": { \"type\":\"keyword\"},"
|
||||||
+ " \"responsetime\": { \"type\":\"float\"}"
|
+ " \"responsetime\": { \"type\":\"float\"}"
|
||||||
+ " }"
|
+ " }"
|
||||||
|
@ -115,10 +120,10 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
|
||||||
+ "}");
|
+ "}");
|
||||||
client().performRequest(createAirlineDataRequest);
|
client().performRequest(createAirlineDataRequest);
|
||||||
Request airlineData1 = new Request("PUT", "/airline-data/_doc/1");
|
Request airlineData1 = new Request("PUT", "/airline-data/_doc/1");
|
||||||
airlineData1.setJsonEntity("{\"time\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":135.22}");
|
airlineData1.setJsonEntity("{\"time\":\"2016-06-01T00:00:00" + randomNanos + "Z\",\"airline\":\"AAA\",\"responsetime\":135.22}");
|
||||||
client().performRequest(airlineData1);
|
client().performRequest(airlineData1);
|
||||||
Request airlineData2 = new Request("PUT", "/airline-data/_doc/2");
|
Request airlineData2 = new Request("PUT", "/airline-data/_doc/2");
|
||||||
airlineData2.setJsonEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"airline\":\"AAA\",\"responsetime\":541.76}");
|
airlineData2.setJsonEntity("{\"time\":\"2016-06-01T01:59:00" + randomNanos + "Z\",\"airline\":\"AAA\",\"responsetime\":541.76}");
|
||||||
client().performRequest(airlineData2);
|
client().performRequest(airlineData2);
|
||||||
|
|
||||||
// Ensure all data is searchable
|
// Ensure all data is searchable
|
||||||
|
@ -147,7 +152,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
|
||||||
assertEquals(2, dataCountsDoc.get("input_record_count"));
|
assertEquals(2, dataCountsDoc.get("input_record_count"));
|
||||||
assertEquals(2, dataCountsDoc.get("processed_record_count"));
|
assertEquals(2, dataCountsDoc.get("processed_record_count"));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new UncheckedIOException(e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -233,7 +238,7 @@ public class MlBasicMultiNodeIT extends ESRestTestCase {
|
||||||
assertEquals(1000, responseBody2.get("bucket_count"));
|
assertEquals(1000, responseBody2.get("bucket_count"));
|
||||||
|
|
||||||
// unintuitive: should return the earliest record timestamp of this feed???
|
// unintuitive: should return the earliest record timestamp of this feed???
|
||||||
assertEquals(null, responseBody2.get("earliest_record_timestamp"));
|
assertNull(responseBody2.get("earliest_record_timestamp"));
|
||||||
assertEquals(1407082000000L, responseBody2.get("latest_record_timestamp"));
|
assertEquals(1407082000000L, responseBody2.get("latest_record_timestamp"));
|
||||||
|
|
||||||
assertEquals(Collections.singletonMap("closed", true),
|
assertEquals(Collections.singletonMap("closed", true),
|
||||||
|
|
|
@ -61,7 +61,7 @@ public class TimeBasedExtractedFields extends ExtractedFields {
|
||||||
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
|
List<String> remainingFields = job.allInputFields().stream().filter(f -> !f.equals(timeField)).collect(Collectors.toList());
|
||||||
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
|
List<ExtractedField> allExtractedFields = new ArrayList<>(remainingFields.size() + 1);
|
||||||
allExtractedFields.add(timeExtractedField);
|
allExtractedFields.add(timeExtractedField);
|
||||||
remainingFields.stream().forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
|
remainingFields.forEach(field -> allExtractedFields.add(extractionMethodDetector.detect(field)));
|
||||||
|
|
||||||
return new TimeBasedExtractedFields(timeExtractedField, allExtractedFields);
|
return new TimeBasedExtractedFields(timeExtractedField, allExtractedFields);
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class ExtractedFields {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ExtractedField detectNonScriptField(String field) {
|
private ExtractedField detectNonScriptField(String field) {
|
||||||
if (isFieldOfType(field, TimeField.TYPE) && isAggregatable(field)) {
|
if (isFieldOfTypes(field, TimeField.TYPES) && isAggregatable(field)) {
|
||||||
return new TimeField(field, ExtractedField.Method.DOC_VALUE);
|
return new TimeField(field, ExtractedField.Method.DOC_VALUE);
|
||||||
}
|
}
|
||||||
if (isFieldOfType(field, GeoPointField.TYPE)) {
|
if (isFieldOfType(field, GeoPointField.TYPE)) {
|
||||||
|
@ -129,9 +129,14 @@ public class ExtractedFields {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isFieldOfType(String field, String type) {
|
private boolean isFieldOfType(String field, String type) {
|
||||||
|
return isFieldOfTypes(field, Collections.singleton(type));
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isFieldOfTypes(String field, Set<String> types) {
|
||||||
|
assert types.isEmpty() == false;
|
||||||
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
|
Map<String, FieldCapabilities> fieldCaps = fieldsCapabilities.getField(field);
|
||||||
if (fieldCaps != null && fieldCaps.size() == 1) {
|
if (fieldCaps != null && fieldCaps.isEmpty() == false) {
|
||||||
return fieldCaps.containsKey(type);
|
return types.containsAll(fieldCaps.keySet());
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml.extractor;
|
package org.elasticsearch.xpack.ml.extractor;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -13,15 +14,17 @@ import java.util.Set;
|
||||||
|
|
||||||
public class TimeField extends AbstractField {
|
public class TimeField extends AbstractField {
|
||||||
|
|
||||||
static final String TYPE = "date";
|
static final Set<String> TYPES = Collections.unmodifiableSet(Sets.newHashSet("date", "date_nanos"));
|
||||||
|
|
||||||
private static final Set<String> TYPES = Collections.singleton(TYPE);
|
|
||||||
|
|
||||||
private static final String EPOCH_MILLIS_FORMAT = "epoch_millis";
|
private static final String EPOCH_MILLIS_FORMAT = "epoch_millis";
|
||||||
|
|
||||||
private final Method method;
|
private final Method method;
|
||||||
|
|
||||||
public TimeField(String name, Method method) {
|
public TimeField(String name, Method method) {
|
||||||
|
// This class intentionally reports the possible types rather than the types reported by
|
||||||
|
// field caps at the point of construction. This means that it will continue to work if,
|
||||||
|
// for example, a newly created index has a "date_nanos" time field when in all the indices
|
||||||
|
// that matched the pattern when this constructor was called the field had type "date".
|
||||||
super(name, TYPES);
|
super(name, TYPES);
|
||||||
if (method == Method.SOURCE) {
|
if (method == Method.SOURCE) {
|
||||||
throw new IllegalArgumentException("time field [" + name + "] cannot be extracted from source");
|
throw new IllegalArgumentException("time field [" + name + "] cannot be extracted from source");
|
||||||
|
@ -41,7 +44,23 @@ public class TimeField extends AbstractField {
|
||||||
return value;
|
return value;
|
||||||
}
|
}
|
||||||
if (value[0] instanceof String) { // doc_value field with the epoch_millis format
|
if (value[0] instanceof String) { // doc_value field with the epoch_millis format
|
||||||
value[0] = Long.parseLong((String) value[0]);
|
// Since nanosecond support was added epoch_millis timestamps may have a fractional component.
|
||||||
|
// We discard this, taking just whole milliseconds. Arguably it would be better to retain the
|
||||||
|
// precision here and let the downstream component decide whether it wants the accuracy, but
|
||||||
|
// that makes it hard to pass around the value as a number. The double type doesn't have
|
||||||
|
// enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would
|
||||||
|
// work, but that isn't supported by the JSON parser if the number gets round-tripped through
|
||||||
|
// JSON. So String is really the only format that could be used, but the ML consumers of time
|
||||||
|
// are expecting a number.
|
||||||
|
String strVal0 = (String) value[0];
|
||||||
|
int dotPos = strVal0.indexOf('.');
|
||||||
|
if (dotPos == -1) {
|
||||||
|
value[0] = Long.parseLong(strVal0);
|
||||||
|
} else if (dotPos > 0) {
|
||||||
|
value[0] = Long.parseLong(strVal0.substring(0, dotPos));
|
||||||
|
} else {
|
||||||
|
value[0] = 0L;
|
||||||
|
}
|
||||||
} else if (value[0] instanceof Long == false) { // pre-6.0 field
|
} else if (value[0] instanceof Long == false) { // pre-6.0 field
|
||||||
throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());
|
throw new IllegalStateException("Unexpected value for a time field: " + value[0].getClass());
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,33 +5,61 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.xpack.ml.extractor;
|
package org.elasticsearch.xpack.ml.extractor;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.time.DateFormatter;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
|
import org.elasticsearch.xpack.ml.test.SearchHitBuilder;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.contains;
|
import java.time.Instant;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
|
||||||
public class TimeFieldTests extends ESTestCase {
|
public class TimeFieldTests extends ESTestCase {
|
||||||
|
|
||||||
public void testDocValueWithStringValue() {
|
public void testDocValueWithWholeMillisecondStringValue() {
|
||||||
long millis = randomLong();
|
long millis = randomNonNegativeLong();
|
||||||
SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", Long.toString(millis)).build();
|
Instant time = Instant.ofEpochMilli(millis);
|
||||||
|
DateFormatter formatter = DateFormatter.forPattern("epoch_millis");
|
||||||
|
String timeAsString = formatter.format(time);
|
||||||
|
SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", timeAsString).build();
|
||||||
|
|
||||||
ExtractedField timeField = new TimeField("time", ExtractedField.Method.DOC_VALUE);
|
ExtractedField timeField = new TimeField("time", ExtractedField.Method.DOC_VALUE);
|
||||||
|
|
||||||
assertThat(timeField.value(hit), equalTo(new Object[] { millis }));
|
assertThat(timeField.value(hit), equalTo(new Object[] { millis }));
|
||||||
assertThat(timeField.getName(), equalTo("time"));
|
assertThat(timeField.getName(), equalTo("time"));
|
||||||
assertThat(timeField.getSearchField(), equalTo("time"));
|
assertThat(timeField.getSearchField(), equalTo("time"));
|
||||||
assertThat(timeField.getTypes(), contains("date"));
|
assertThat(timeField.getTypes(), containsInAnyOrder("date", "date_nanos"));
|
||||||
assertThat(timeField.getMethod(), equalTo(ExtractedField.Method.DOC_VALUE));
|
assertThat(timeField.getMethod(), equalTo(ExtractedField.Method.DOC_VALUE));
|
||||||
assertThat(timeField.getDocValueFormat(), equalTo("epoch_millis"));
|
assertThat(timeField.getDocValueFormat(), equalTo("epoch_millis"));
|
||||||
assertThat(timeField.supportsFromSource(), is(false));
|
assertThat(timeField.supportsFromSource(), is(false));
|
||||||
expectThrows(UnsupportedOperationException.class, () -> timeField.newFromSource());
|
expectThrows(UnsupportedOperationException.class, timeField::newFromSource);
|
||||||
assertThat(timeField.isMultiField(), is(false));
|
assertThat(timeField.isMultiField(), is(false));
|
||||||
expectThrows(UnsupportedOperationException.class, () -> timeField.getParentField());
|
expectThrows(UnsupportedOperationException.class, timeField::getParentField);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDocValueWithFractionalMillisecondStringValue() {
|
||||||
|
long millis = randomNonNegativeLong();
|
||||||
|
int extraNanos = randomIntBetween(1, 999999);
|
||||||
|
Instant time = Instant.ofEpochMilli(millis).plusNanos(extraNanos);
|
||||||
|
DateFormatter formatter = DateFormatter.forPattern("epoch_millis");
|
||||||
|
String timeAsString = formatter.format(time);
|
||||||
|
SearchHit hit = new SearchHitBuilder(randomInt()).addField("time", timeAsString).build();
|
||||||
|
|
||||||
|
ExtractedField timeField = new TimeField("time", ExtractedField.Method.DOC_VALUE);
|
||||||
|
|
||||||
|
assertThat(timeField.value(hit), equalTo(new Object[] { millis }));
|
||||||
|
assertThat(timeField.getName(), equalTo("time"));
|
||||||
|
assertThat(timeField.getSearchField(), equalTo("time"));
|
||||||
|
assertThat(timeField.getTypes(), containsInAnyOrder("date", "date_nanos"));
|
||||||
|
assertThat(timeField.getMethod(), equalTo(ExtractedField.Method.DOC_VALUE));
|
||||||
|
assertThat(timeField.getDocValueFormat(), equalTo("epoch_millis"));
|
||||||
|
assertThat(timeField.supportsFromSource(), is(false));
|
||||||
|
expectThrows(UnsupportedOperationException.class, timeField::newFromSource);
|
||||||
|
assertThat(timeField.isMultiField(), is(false));
|
||||||
|
expectThrows(UnsupportedOperationException.class, timeField::getParentField);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testScriptWithLongValue() {
|
public void testScriptWithLongValue() {
|
||||||
|
@ -43,13 +71,13 @@ public class TimeFieldTests extends ESTestCase {
|
||||||
assertThat(timeField.value(hit), equalTo(new Object[] { millis }));
|
assertThat(timeField.value(hit), equalTo(new Object[] { millis }));
|
||||||
assertThat(timeField.getName(), equalTo("time"));
|
assertThat(timeField.getName(), equalTo("time"));
|
||||||
assertThat(timeField.getSearchField(), equalTo("time"));
|
assertThat(timeField.getSearchField(), equalTo("time"));
|
||||||
assertThat(timeField.getTypes(), contains("date"));
|
assertThat(timeField.getTypes(), containsInAnyOrder("date", "date_nanos"));
|
||||||
assertThat(timeField.getMethod(), equalTo(ExtractedField.Method.SCRIPT_FIELD));
|
assertThat(timeField.getMethod(), equalTo(ExtractedField.Method.SCRIPT_FIELD));
|
||||||
expectThrows(UnsupportedOperationException.class, () -> timeField.getDocValueFormat());
|
expectThrows(UnsupportedOperationException.class, timeField::getDocValueFormat);
|
||||||
assertThat(timeField.supportsFromSource(), is(false));
|
assertThat(timeField.supportsFromSource(), is(false));
|
||||||
expectThrows(UnsupportedOperationException.class, () -> timeField.newFromSource());
|
expectThrows(UnsupportedOperationException.class, timeField::newFromSource);
|
||||||
assertThat(timeField.isMultiField(), is(false));
|
assertThat(timeField.isMultiField(), is(false));
|
||||||
expectThrows(UnsupportedOperationException.class, () -> timeField.getParentField());
|
expectThrows(UnsupportedOperationException.class, timeField::getParentField);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUnknownFormat() {
|
public void testUnknownFormat() {
|
||||||
|
|
Loading…
Reference in New Issue