[ML] generate unique doc ids for data frame (#40382)

create and use unique, deterministic document ids based on the grouping values.

This is a pre-requisite for updating documents as well as preventing duplicates after a hard failure during indexing.
This commit is contained in:
Hendrik Muhs 2019-03-27 08:25:49 +01:00
parent 524e0273ae
commit f4e56118c2
7 changed files with 349 additions and 12 deletions

View File

@ -52,6 +52,9 @@ public final class DataFrameField {
public static final String FOR_INTERNAL_STORAGE = "for_internal_storage";
// internal document id
public static String DOCUMENT_ID_FIELD = "_id";
private DataFrameField() {

View File

@ -82,7 +82,13 @@ public class TransportPreviewDataFrameTransformAction extends
r -> {
final CompositeAggregation agg = r.getAggregations().get(COMPOSITE_AGGREGATION_NAME);
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
listener.onResponse(pivot.extractResults(agg, deducedMappings, stats).collect(Collectors.toList()));
// remove all internal fields
List<Map<String, Object>> results = pivot.extractResults(agg, deducedMappings, stats)
.map(record -> {
record.keySet().removeIf(k -> k.startsWith("_"));
return record;

View File

@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.indexing.AsyncTwoPhaseIndexer;
@ -73,15 +74,28 @@ public abstract class DataFrameIndexer extends AsyncTwoPhaseIndexer<Map<String,
String indexName = transformConfig.getDestination().getIndex();
return pivot.extractResults(agg, getFieldMappings(), getStats()).map(document -> {
String id = (String) document.get(DataFrameField.DOCUMENT_ID_FIELD);
if (id == null) {
throw new RuntimeException("Expected a document id but got null.");
XContentBuilder builder;
try {
builder = jsonBuilder();
for (Map.Entry<String, ?> value : document.entrySet()) {
// skip all internal fields
if (value.getKey().startsWith("_") == false) {
builder.field(value.getKey(), value.getValue());
} catch (IOException e) {
throw new UncheckedIOException(e);
IndexRequest request = new IndexRequest(indexName).source(builder);
IndexRequest request = new IndexRequest(indexName).source(builder).id(id);
return request;

View File

@ -0,0 +1,96 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.xpack.dataframe.transforms;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.hash.MurmurHash3;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.TreeMap;
* ID Generator for creating unique but deterministic document ids.
* uses MurmurHash with 128 bits
public final class IDGenerator {
private static final byte[] NULL_VALUE = "__NULL_VALUE__".getBytes(StandardCharsets.UTF_8);
private static final byte DELIM = '$';
private static final long SEED = 19;
private static final int MAX_FIRST_BYTES = 5;
private final TreeMap<String, Object> objectsForIDGeneration = new TreeMap<>();
public IDGenerator() {
* Add a value to the generator
* @param key object identifier, to be used for consistent sorting
* @param value the value
public void add(String key, Object value) {
if (objectsForIDGeneration.containsKey(key)) {
throw new IllegalArgumentException("Keys must be unique");
objectsForIDGeneration.put(key, value);
* Create a document id based on the input objects
* @return a document id as string
public String getID() {
if (objectsForIDGeneration.size() == 0) {
throw new RuntimeException("Add at least 1 object before generating the ID");
BytesRefBuilder buffer = new BytesRefBuilder();
BytesRefBuilder hashedBytes = new BytesRefBuilder();
for (Object value : objectsForIDGeneration.values()) {
byte[] v = getBytes(value);
buffer.append(v, 0, v.length);
// keep the 1st byte of every object
if (hashedBytes.length() <= MAX_FIRST_BYTES) {
MurmurHash3.Hash128 hasher = MurmurHash3.hash128(buffer.bytes(), 0, buffer.length(), SEED, new MurmurHash3.Hash128());
hashedBytes.append(Numbers.longToBytes(hasher.h1), 0, 8);
hashedBytes.append(Numbers.longToBytes(hasher.h2), 0, 8);
return Base64.getUrlEncoder().withoutPadding().encodeToString(hashedBytes.bytes());
* Turns objects into byte arrays, only supporting types returned groupBy
* @param value the value as object
* @return a byte representation of the input object
private static byte[] getBytes(Object value) {
if (value == null) {
return NULL_VALUE;
} else if (value instanceof String) {
return ((String) value).getBytes(StandardCharsets.UTF_8);
} else if (value instanceof Long) {
return Numbers.longToBytes((Long) value);
} else if (value instanceof Double) {
return Numbers.doubleToBytes((Double) value);
} else if (value instanceof Integer) {
return Numbers.intToBytes((Integer) value);
throw new IllegalArgumentException("Value of type [" + value.getClass() + "] is not supported");

View File

@ -13,8 +13,10 @@ import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation.SingleValue;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig;
import org.elasticsearch.xpack.dataframe.transforms.IDGenerator;
import java.util.Collection;
import java.util.HashMap;
@ -43,10 +45,17 @@ final class AggregationResultUtils {
DataFrameIndexerTransformStats stats) {
return agg.getBuckets().stream().map(bucket -> {
Map<String, Object> document = new HashMap<>();
groups.getGroups().keySet().forEach(destinationFieldName ->
document.put(destinationFieldName, bucket.getKey().get(destinationFieldName)));
// generator to create unique but deterministic document ids, so we
// - do not create duplicates if we re-run after failure
// - update documents
IDGenerator idGen = new IDGenerator();
groups.getGroups().keySet().forEach(destinationFieldName -> {
Object value = bucket.getKey().get(destinationFieldName);
idGen.add(destinationFieldName, value);
document.put(destinationFieldName, value);
for (AggregationBuilder aggregationBuilder : aggregationBuilders) {
String aggName = aggregationBuilder.getName();
@ -71,6 +80,9 @@ final class AggregationResultUtils {
assert false;
document.put(DataFrameField.DOCUMENT_ID_FIELD, idGen.getID());
return document;

View File

@ -0,0 +1,63 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
package org.elasticsearch.xpack.dataframe.transforms;
import org.elasticsearch.test.ESTestCase;
public class IDGeneratorTests extends ESTestCase {
public void testSupportedTypes() {
IDGenerator idGen = new IDGenerator();
idGen.add("key1", "value1");
String id = idGen.getID();
idGen.add("key2", null);
assertNotEquals(id, idGen.getID());
id = idGen.getID();
idGen.add("key3", "value3");
assertNotEquals(id, idGen.getID());
id = idGen.getID();
idGen.add("key4", 12L);
assertNotEquals(id, idGen.getID());
id = idGen.getID();
idGen.add("key5", 44.444);
assertNotEquals(id, idGen.getID());
idGen.add("key6", 13);
assertNotEquals(id, idGen.getID());
public void testOrderIndependence() {
IDGenerator idGen = new IDGenerator();
idGen.add("key1", "value1");
idGen.add("key2", "value2");
String id1 = idGen.getID();
idGen = new IDGenerator();
idGen.add("key2", "value2");
idGen.add("key1", "value1");
String id2 = idGen.getID();
assertEquals(id1, id2);
public void testEmptyThrows() {
IDGenerator idGen = new IDGenerator();
RuntimeException e = expectThrows(RuntimeException.class, () -> idGen.getID());
assertEquals("Add at least 1 object before generating the ID", e.getMessage());
public void testDuplicatedKeyThrows() {
IDGenerator idGen = new IDGenerator();
idGen.add("key1", "value1");
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> idGen.add("key1", "some_other_value"));
assertEquals("Keys must be unique", e.getMessage());

View File

@ -44,6 +44,7 @@ import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilde
import org.elasticsearch.search.aggregations.pipeline.ParsedStatsBucket;
import org.elasticsearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameIndexerTransformStats;
import org.elasticsearch.xpack.core.dataframe.transforms.pivot.GroupConfig;
@ -51,8 +52,10 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
@ -147,7 +150,7 @@ public class AggregationResultUtilsTests extends ESTestCase {
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 20);
public void testExtractCompositeAggregationResultsMultiSources() throws IOException {
public void testExtractCompositeAggregationResultsMultipleGroups() throws IOException {
String targetField = randomAlphaOfLengthBetween(5, 10);
String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2";
@ -406,19 +409,159 @@ public class AggregationResultUtilsTests extends ESTestCase {
executeTest(groupBy, aggregationBuilders, input, fieldTypeMap, expected, 10);
public void testExtractCompositeAggregationResultsDocIDs() throws IOException {
String targetField = randomAlphaOfLengthBetween(5, 10);
String targetField2 = randomAlphaOfLengthBetween(5, 10) + "_2";
GroupConfig groupBy = parseGroupConfig("{"
+ "\"" + targetField + "\" : {"
+ " \"terms\" : {"
+ " \"field\" : \"doesn't_matter_for_this_test\""
+ " } },"
+ "\"" + targetField2 + "\" : {"
+ " \"terms\" : {"
+ " \"field\" : \"doesn't_matter_for_this_test\""
+ " } }"
+ "}");
String aggName = randomAlphaOfLengthBetween(5, 10);
String aggTypedName = "avg#" + aggName;
Collection<AggregationBuilder> aggregationBuilders = Collections.singletonList(AggregationBuilders.avg(aggName));
Map<String, Object> inputFirstRun = asMap(
KEY, asMap(
targetField, "ID1",
targetField2, "ID1_2"
aggTypedName, asMap(
"value", 42.33),
KEY, asMap(
targetField, "ID1",
targetField2, "ID2_2"
aggTypedName, asMap(
"value", 8.4),
KEY, asMap(
targetField, "ID2",
targetField2, "ID1_2"
aggTypedName, asMap(
"value", 28.99),
KEY, asMap(
targetField, "ID3",
targetField2, "ID2_2"
aggTypedName, asMap(
"value", 12.55),
Map<String, Object> inputSecondRun = asMap(
KEY, asMap(
targetField, "ID1",
targetField2, "ID1_2"
aggTypedName, asMap(
"value", 433.33),
KEY, asMap(
targetField, "ID1",
targetField2, "ID2_2"
aggTypedName, asMap(
"value", 83.4),
KEY, asMap(
targetField, "ID2",
targetField2, "ID1_2"
aggTypedName, asMap(
"value", 21.99),
KEY, asMap(
targetField, "ID3",
targetField2, "ID2_2"
aggTypedName, asMap(
"value", 122.55),
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
Map<String, String> fieldTypeMap = asStringMap(
aggName, "double",
targetField, "keyword",
targetField2, "keyword"
List<Map<String, Object>> resultFirstRun = runExtraction(groupBy, aggregationBuilders, inputFirstRun, fieldTypeMap, stats);
List<Map<String, Object>> resultSecondRun = runExtraction(groupBy, aggregationBuilders, inputSecondRun, fieldTypeMap, stats);
assertNotEquals(resultFirstRun, resultSecondRun);
Set<String> documentIdsFirstRun = new HashSet<>();
resultFirstRun.forEach(m -> {
documentIdsFirstRun.add((String) m.get(DataFrameField.DOCUMENT_ID_FIELD));
assertEquals(4, documentIdsFirstRun.size());
Set<String> documentIdsSecondRun = new HashSet<>();
resultSecondRun.forEach(m -> {
documentIdsSecondRun.add((String) m.get(DataFrameField.DOCUMENT_ID_FIELD));
assertEquals(4, documentIdsSecondRun.size());
assertEquals(documentIdsFirstRun, documentIdsSecondRun);
private void executeTest(GroupConfig groups, Collection<AggregationBuilder> aggregationBuilders, Map<String, Object> input,
Map<String, String> fieldTypeMap, List<Map<String, Object>> expected, long expectedDocCounts) throws IOException {
DataFrameIndexerTransformStats stats = new DataFrameIndexerTransformStats();
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
try (XContentParser parser = createParser(builder)) {
CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature");
List<Map<String, Object>> result = AggregationResultUtils
.extractCompositeAggregationResults(agg, groups, aggregationBuilders, fieldTypeMap, stats).collect(Collectors.toList());
List<Map<String, Object>> result = runExtraction(groups, aggregationBuilders, input, fieldTypeMap, stats);
// remove the document ids and test uniqueness
Set<String> documentIds = new HashSet<>();
result.forEach(m -> {
documentIds.add((String) m.remove(DataFrameField.DOCUMENT_ID_FIELD));
assertEquals(result.size(), documentIds.size());
assertEquals(expected, result);
assertEquals(expectedDocCounts, stats.getNumDocuments());
private List<Map<String, Object>> runExtraction(GroupConfig groups, Collection<AggregationBuilder> aggregationBuilders,
Map<String, Object> input, Map<String, String> fieldTypeMap, DataFrameIndexerTransformStats stats) throws IOException {
XContentBuilder builder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
try (XContentParser parser = createParser(builder)) {
CompositeAggregation agg = ParsedComposite.fromXContent(parser, "my_feature");
return AggregationResultUtils.extractCompositeAggregationResults(agg, groups, aggregationBuilders, fieldTypeMap, stats)