[ML] Improve uniqueness of result document IDs (#50644)
Switch from a 32 bit Java hash to a 128 bit Murmur hash for creating document IDs from by/over/partition field values. The 32 bit Java hash was not sufficiently unique, and could produce identical numbers for relatively common combinations of by/partition field values such as L018/128 and L017/228. Fixes #50613
This commit is contained in:
parent
46d600c446
commit
35453e2b0e
|
@ -5,10 +5,18 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ml;
|
||||
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.hash.MurmurHash3;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.math.BigInteger;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class MachineLearningField {
|
||||
public static final Setting<Boolean> AUTODETECT_PROCESS =
|
||||
Setting.boolSetting("xpack.ml.autodetect_process", true, Setting.Property.NodeScope);
|
||||
|
@ -19,4 +27,13 @@ public final class MachineLearningField {
|
|||
|
||||
private MachineLearningField() {}
|
||||
|
||||
public static String valuesToId(String... values) {
|
||||
String combined = Arrays.stream(values).filter(Objects::nonNull).collect(Collectors.joining());
|
||||
byte[] bytes = combined.getBytes(StandardCharsets.UTF_8);
|
||||
MurmurHash3.Hash128 hash = MurmurHash3.hash128(bytes, 0, bytes.length, 0, new MurmurHash3.Hash128());
|
||||
byte[] hashedBytes = new byte[16];
|
||||
System.arraycopy(Numbers.longToBytes(hash.h1), 0, hashedBytes, 0, 8);
|
||||
System.arraycopy(Numbers.longToBytes(hash.h2), 0, hashedBytes, 8, 8);
|
||||
return new BigInteger(hashedBytes) + "_" + combined.length();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Detector;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
@ -358,12 +359,13 @@ public class AnomalyRecord implements ToXContentObject, Writeable {
|
|||
* Data store ID of this record.
|
||||
*/
|
||||
public String getId() {
|
||||
int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
int length = (byFieldValue == null ? 0 : byFieldValue.length()) +
|
||||
(overFieldValue == null ? 0 : overFieldValue.length()) +
|
||||
(partitionFieldValue == null ? 0 : partitionFieldValue.length());
|
||||
return buildId(jobId, timestamp, bucketSpan, detectorIndex, byFieldValue, overFieldValue, partitionFieldValue);
|
||||
}
|
||||
|
||||
return jobId + "_record_" + timestamp.getTime() + "_" + bucketSpan + "_" + detectorIndex + "_" + valuesHash + "_" + length;
|
||||
static String buildId(String jobId, Date timestamp, long bucketSpan, int detectorIndex,
|
||||
String byFieldValue, String overFieldValue, String partitionFieldValue) {
|
||||
return jobId + "_record_" + timestamp.getTime() + "_" + bucketSpan + "_" + detectorIndex + "_"
|
||||
+ MachineLearningField.valuesToId(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
}
|
||||
|
||||
public int getDetectorIndex() {
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.common.time.TimeUtils;
|
||||
|
||||
|
@ -165,12 +166,9 @@ public class Forecast implements ToXContentObject, Writeable {
|
|||
}
|
||||
|
||||
public String getId() {
|
||||
int valuesHash = Objects.hash(byFieldValue, partitionFieldValue);
|
||||
int length = (byFieldValue == null ? 0 : byFieldValue.length()) +
|
||||
(partitionFieldValue == null ? 0 : partitionFieldValue.length());
|
||||
return jobId + "_model_forecast_" + forecastId + "_" + timestamp.getTime()
|
||||
+ "_" + bucketSpan + "_" + detectorIndex + "_"
|
||||
+ valuesHash + "_" + length;
|
||||
+ MachineLearningField.valuesToId(byFieldValue, partitionFieldValue);
|
||||
}
|
||||
|
||||
public Date getTimestamp() {
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
import org.elasticsearch.xpack.core.common.time.TimeUtils;
|
||||
|
@ -134,7 +135,7 @@ public class Influencer implements ToXContentObject, Writeable {
|
|||
|
||||
public String getId() {
|
||||
return jobId + "_influencer_" + timestamp.getTime() + "_" + bucketSpan + "_" +
|
||||
influenceField + "_" + influenceValue.hashCode() + "_" + influenceValue.length();
|
||||
influenceField + "_" + MachineLearningField.valuesToId(influenceValue);
|
||||
}
|
||||
|
||||
public double getProbability() {
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
|||
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.job.config.Job;
|
||||
import org.elasticsearch.xpack.core.common.time.TimeUtils;
|
||||
|
||||
|
@ -205,12 +206,8 @@ public class ModelPlot implements ToXContentObject, Writeable {
|
|||
}
|
||||
|
||||
public String getId() {
|
||||
int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
int length = (byFieldValue == null ? 0 : byFieldValue.length()) +
|
||||
(overFieldValue == null ? 0 : overFieldValue.length()) +
|
||||
(partitionFieldValue == null ? 0 : partitionFieldValue.length());
|
||||
return jobId + "_model_plot_" + timestamp.getTime() + "_" + bucketSpan
|
||||
+ "_" + detectorIndex + "_" + valuesHash + "_" + length;
|
||||
+ "_" + detectorIndex + "_" + MachineLearningField.valuesToId(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
}
|
||||
|
||||
public Date getTimestamp() {
|
||||
|
|
|
@ -14,17 +14,20 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class AnomalyRecordTests extends AbstractSerializingTestCase<AnomalyRecord> {
|
||||
|
||||
|
@ -174,28 +177,23 @@ public class AnomalyRecordTests extends AbstractSerializingTestCase<AnomalyRecor
|
|||
String overFieldValue = null;
|
||||
String partitionFieldValue = null;
|
||||
|
||||
int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
assertEquals("test-job_record_1000_60_0_" + valuesHash + "_0", record.getId());
|
||||
assertEquals("test-job_record_1000_60_0_0_0", record.getId());
|
||||
|
||||
int length = 0;
|
||||
if (randomBoolean()) {
|
||||
byFieldValue = randomAlphaOfLength(10);
|
||||
length += byFieldValue.length();
|
||||
record.setByFieldValue(byFieldValue);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
overFieldValue = randomAlphaOfLength(10);
|
||||
length += overFieldValue.length();
|
||||
record.setOverFieldValue(overFieldValue);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
partitionFieldValue = randomAlphaOfLength(10);
|
||||
length += partitionFieldValue.length();
|
||||
record.setPartitionFieldValue(partitionFieldValue);
|
||||
}
|
||||
|
||||
valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
assertEquals("test-job_record_1000_60_0_" + valuesHash + "_" + length, record.getId());
|
||||
String valuesPart = MachineLearningField.valuesToId(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
assertEquals("test-job_record_1000_60_0_" + valuesPart, record.getId());
|
||||
}
|
||||
|
||||
public void testStrictParser_IsLenientOnTopLevelFields() throws IOException {
|
||||
|
@ -222,4 +220,18 @@ public class AnomalyRecordTests extends AbstractSerializingTestCase<AnomalyRecor
|
|||
AnomalyRecord.LENIENT_PARSER.apply(parser, null);
|
||||
}
|
||||
}
|
||||
|
||||
public void testIdLength() {
|
||||
String jobId = randomAlphaOfLength(MlStrings.ID_LENGTH_LIMIT);
|
||||
Date timestamp = new Date(Long.MAX_VALUE);
|
||||
long bucketSpan = Long.MAX_VALUE;
|
||||
int detectorIndex = Integer.MAX_VALUE;
|
||||
String byFieldValue = randomAlphaOfLength(randomIntBetween(100, 1000));
|
||||
String overFieldValue = randomAlphaOfLength(randomIntBetween(100, 1000));
|
||||
String partitionFieldValue = randomAlphaOfLength(randomIntBetween(100, 1000));
|
||||
|
||||
String id = AnomalyRecord.buildId(jobId, timestamp, bucketSpan, detectorIndex, byFieldValue, overFieldValue, partitionFieldValue);
|
||||
// 512 comes from IndexRequest.validate()
|
||||
assertThat(id.getBytes(StandardCharsets.UTF_8).length, lessThanOrEqualTo(512));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,10 +12,10 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
|
||||
public class InfluencerTests extends AbstractSerializingTestCase<Influencer> {
|
||||
|
||||
|
@ -64,8 +64,8 @@ public class InfluencerTests extends AbstractSerializingTestCase<Influencer> {
|
|||
public void testId() {
|
||||
String influencerFieldValue = "wopr";
|
||||
Influencer influencer = new Influencer("job-foo", "host", influencerFieldValue, new Date(1000), 300L);
|
||||
int valueHash = Objects.hashCode(influencerFieldValue);
|
||||
assertEquals("job-foo_influencer_1000_300_host_" + valueHash + "_" + influencerFieldValue.length(), influencer.getId());
|
||||
String valuePart = MachineLearningField.valuesToId(influencerFieldValue);
|
||||
assertEquals("job-foo_influencer_1000_300_host_" + valuePart, influencer.getId());
|
||||
}
|
||||
|
||||
public void testLenientParser() throws IOException {
|
||||
|
|
|
@ -9,11 +9,11 @@ import org.elasticsearch.common.io.stream.Writeable.Reader;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.Forecast;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
|
@ -72,23 +72,19 @@ public class ForecastTests extends AbstractSerializingTestCase<Forecast> {
|
|||
String byFieldValue = null;
|
||||
String partitionFieldValue = null;
|
||||
|
||||
int valuesHash = Objects.hash(byFieldValue, partitionFieldValue);
|
||||
assertEquals("job-foo_model_forecast_222_100_60_2_" + valuesHash + "_0", forecast.getId());
|
||||
assertEquals("job-foo_model_forecast_222_100_60_2_0_0", forecast.getId());
|
||||
|
||||
int length = 0;
|
||||
if (randomBoolean()) {
|
||||
byFieldValue = randomAlphaOfLength(10);
|
||||
length += byFieldValue.length();
|
||||
forecast.setByFieldValue(byFieldValue);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
partitionFieldValue = randomAlphaOfLength(10);
|
||||
length += partitionFieldValue.length();
|
||||
forecast.setPartitionFieldValue(partitionFieldValue);
|
||||
}
|
||||
|
||||
valuesHash = Objects.hash(byFieldValue, partitionFieldValue);
|
||||
assertEquals("job-foo_model_forecast_222_100_60_2_" + valuesHash + "_" + length, forecast.getId());
|
||||
String valuesPart = MachineLearningField.valuesToId(byFieldValue, partitionFieldValue);
|
||||
assertEquals("job-foo_model_forecast_222_100_60_2_" + valuesPart, forecast.getId());
|
||||
}
|
||||
|
||||
public void testStrictParser() throws IOException {
|
||||
|
|
|
@ -12,11 +12,15 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
import org.elasticsearch.xpack.core.ml.MachineLearningField;
|
||||
import org.elasticsearch.xpack.core.ml.job.results.ModelPlot;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.Objects;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
@ -221,28 +225,23 @@ public class ModelPlotTests extends AbstractSerializingTestCase<ModelPlot> {
|
|||
String overFieldValue = null;
|
||||
String partitionFieldValue = null;
|
||||
|
||||
int valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
assertEquals("job-foo_model_plot_100_60_33_" + valuesHash + "_0", plot.getId());
|
||||
assertEquals("job-foo_model_plot_100_60_33_0_0", plot.getId());
|
||||
|
||||
int length = 0;
|
||||
if (randomBoolean()) {
|
||||
byFieldValue = randomAlphaOfLength(10);
|
||||
length += byFieldValue.length();
|
||||
plot.setByFieldValue(byFieldValue);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
overFieldValue = randomAlphaOfLength(10);
|
||||
length += overFieldValue.length();
|
||||
plot.setOverFieldValue(overFieldValue);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
partitionFieldValue = randomAlphaOfLength(10);
|
||||
length += partitionFieldValue.length();
|
||||
plot.setPartitionFieldValue(partitionFieldValue);
|
||||
}
|
||||
|
||||
valuesHash = Objects.hash(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
assertEquals("job-foo_model_plot_100_60_33_" + valuesHash + "_" + length, plot.getId());
|
||||
String valuesPart = MachineLearningField.valuesToId(byFieldValue, overFieldValue, partitionFieldValue);
|
||||
assertEquals("job-foo_model_plot_100_60_33_" + valuesPart, plot.getId());
|
||||
}
|
||||
|
||||
public void testStrictParser() throws IOException {
|
||||
|
@ -262,6 +261,43 @@ public class ModelPlotTests extends AbstractSerializingTestCase<ModelPlot> {
|
|||
}
|
||||
}
|
||||
|
||||
public void testIdUniqueness() {
|
||||
ModelPlot modelPlot = new ModelPlot("foo", new Date(), 3600, 0);
|
||||
|
||||
String[] partitionFieldValues = { "730", "132", "358", "552", "888", "236", "224", "674",
|
||||
"438", "128", "722", "560", "228", "628", "226", "656" };
|
||||
String[] byFieldValues = { "S000", "S001", "S002", "S003", "S004", "S005", "S006", "S007", "S008", "S009",
|
||||
"S010", "S011", "S012", "S013", "S014", "S015", "S016", "S017", "S018", "S019",
|
||||
"S020", "S021", "S022", "S023", "S024", "S025", "S026", "S027", "S028", "S029",
|
||||
"S057", "S058", "S059", "M020", "M021", "M026", "M027", "M028", "M029", "M030",
|
||||
"M031", "M032", "M033", "M056", "M057", "M058", "M059", "M060", "M061", "M062",
|
||||
"M063", "M086", "M087", "M088", "M089", "M090", "M091", "M092", "M093", "M116",
|
||||
"M117", "M118", "M119", "L012", "L013", "L014", "L017", "L018", "L019", "L023",
|
||||
"L024", "L025", "L029", "L030", "L031" };
|
||||
|
||||
Map<String, List<String>> uniqueIds = new HashMap<>();
|
||||
|
||||
for (String partitionFieldValue : partitionFieldValues) {
|
||||
modelPlot.setPartitionFieldValue(partitionFieldValue);
|
||||
for (String byFieldValue : byFieldValues) {
|
||||
modelPlot.setByFieldValue(byFieldValue);
|
||||
String id = modelPlot.getId();
|
||||
uniqueIds.compute(id, (k, v) -> {
|
||||
if (v == null) {
|
||||
v = new ArrayList<>();
|
||||
}
|
||||
v.add(partitionFieldValue + "/" + byFieldValue);
|
||||
if (v.size() > 1) {
|
||||
logger.error("Duplicates for ID [" + id + "]: " + v);
|
||||
}
|
||||
return v;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(partitionFieldValues.length * byFieldValues.length, uniqueIds.size());
|
||||
}
|
||||
|
||||
private ModelPlot createFullyPopulated() {
|
||||
ModelPlot modelPlot = new ModelPlot("foo", new Date(12345678L), 360L, 22);
|
||||
modelPlot.setByFieldName("by");
|
||||
|
|
Loading…
Reference in New Issue