Normalize schema fingerprint for column permutations (#17044)

Parent issue: #14989

It is possible for the order of columns to vary across segments especially during realtime ingestion.
Since, the schema fingerprint is sensitive to column order this leads to creation of a large number of segment schema in the metadata database for essentially the same set of columns.

This is wasteful, this patch fixes this problem by computing schema fingerprint on lexicographically sorted columns. This would result in creation of a single schema in the metadata database with the first observed column order for a given signature.
This commit is contained in:
Rishabh Singh 2024-09-18 11:37:06 +05:30 committed by GitHub
parent 2f50138af9
commit 43d790fdb7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 198 additions and 10 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
@ -30,11 +31,17 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Utility to generate fingerprint for an object.
* Utility to generate schema fingerprint which is used to ensure schema uniqueness in the metadata database.
* Note, that the generated fingerprint is independent of the column order.
*/
@LazySingleton
public class FingerprintGenerator
@ -53,12 +60,20 @@ public class FingerprintGenerator
* Generates fingerprint or hash string for an object using SHA-256 hash algorithm.
*/
@SuppressWarnings("UnstableApiUsage")
public String generateFingerprint(SchemaPayload schemaPayload, String dataSource, int version)
public String generateFingerprint(final SchemaPayload schemaPayload, final String dataSource, final int version)
{
// Sort the column names in lexicographic order
// The aggregator factories are column order independent since they are stored in a hashmap
// This ensures that all permutations of a given columns would result in the same fingerprint
// thus avoiding schema explosion in the metadata database
// Note that this signature is not persisted anywhere, it is only used for fingerprint computation
final RowSignature sortedSignature = getLexicographicallySortedSignature(schemaPayload.getRowSignature());
final SchemaPayload updatedPayload = new SchemaPayload(sortedSignature, schemaPayload.getAggregatorFactories());
try {
final Hasher hasher = Hashing.sha256().newHasher();
hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload));
hasher.putBytes(objectMapper.writeValueAsBytes(updatedPayload));
// add delimiter, inspired from org.apache.druid.metadata.PendingSegmentRecord.computeSequenceNamePrevIdSha1
hasher.putByte((byte) 0xff);
@ -82,4 +97,21 @@ public class FingerprintGenerator
);
}
}
@VisibleForTesting
protected RowSignature getLexicographicallySortedSignature(final RowSignature rowSignature)
{
final List<String> columns = new ArrayList<>(rowSignature.getColumnNames());
Collections.sort(columns);
final RowSignature.Builder sortedSignature = RowSignature.builder();
for (String column : columns) {
ColumnType type = rowSignature.getColumnType(column).orElse(null);
sortedSignature.add(column, type);
}
return sortedSignature.build();
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@ -48,6 +49,7 @@ import org.skife.jdbi.v2.Handle;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -281,6 +283,112 @@ public class IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
}
@Test
public void testSchemaPermutation() throws JsonProcessingException
{
Set<DataSegment> segments = new HashSet<>();
SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
// Store the first observed column order for each segment for verification purpose
Map<String, Pair<SchemaPayload, Integer>> segmentIdSchemaMap = new HashMap<>();
RowSignature originalOrder =
RowSignature.builder()
.add("d7", ColumnType.LONG_ARRAY)
.add("b1", ColumnType.FLOAT)
.add("a5", ColumnType.DOUBLE)
.build();
// column permutations
List<List<String>> permutations = Arrays.asList(
Arrays.asList("d7", "a5", "b1"),
Arrays.asList("a5", "b1", "d7"),
Arrays.asList("a5", "d7", "b1"),
Arrays.asList("b1", "d7", "a5"),
Arrays.asList("b1", "a5", "d7"),
Arrays.asList("d7", "a5", "b1")
);
boolean first = true;
Random random = ThreadLocalRandom.current();
Random permutationRandom = ThreadLocalRandom.current();
for (int i = 0; i < 105; i++) {
DataSegment segment = new DataSegment(
"fooDataSource",
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"version",
ImmutableMap.of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(i),
9,
100
);
segments.add(segment);
int randomNum = random.nextInt();
RowSignature rowSignature;
if (first) {
rowSignature = originalOrder;
} else {
RowSignature.Builder builder = RowSignature.builder();
List<String> columns = permutations.get(permutationRandom.nextInt(permutations.size()));
for (String column : columns) {
builder.add(column, originalOrder.getColumnType(column).get());
}
rowSignature = builder.build();
}
SchemaPayload schemaPayload = new SchemaPayload(rowSignature);
segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new SchemaPayload(originalOrder), randomNum));
segmentSchemaMapping.addSchema(
segment.getId(),
new SchemaPayloadPlus(schemaPayload, (long) randomNum),
fingerprintGenerator.generateFingerprint(
schemaPayload,
segment.getDataSource(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
)
);
if (first) {
coordinator.commitSegments(segments, segmentSchemaMapping);
first = false;
}
}
coordinator.commitSegments(segments, segmentSchemaMapping);
for (DataSegment segment : segments) {
Assert.assertArrayEquals(
mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8),
derbyConnector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
"id",
"payload",
segment.getId().toString()
)
);
}
List<String> segmentIds = segments.stream()
.map(segment -> segment.getId().toString())
.sorted(Comparator.naturalOrder())
.collect(Collectors.toList());
Assert.assertEquals(segmentIds, retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get()));
// Should not update dataSource metadata.
Assert.assertEquals(0, metadataUpdateCounter.get());
// verify that only a single schema is created
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
}
@Test
public void testAnnounceHistoricalSegments_schemaExists() throws IOException
{

View File

@ -22,6 +22,7 @@ package org.apache.druid.segment.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.TestHelper;
@ -30,7 +31,9 @@ import org.apache.druid.segment.column.RowSignature;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FingerprintGeneratorTest
@ -45,13 +48,20 @@ public class FingerprintGeneratorTest
@Test
public void testGenerateFingerprint_precalculatedHash()
{
RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build();
RowSignature rowSignature =
RowSignature.builder()
.add("c1", ColumnType.LONG)
.add("c0", ColumnType.STRING)
.add("c2", ColumnType.FLOAT)
.add("c3", ColumnType.DOUBLE)
.build();
Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null));
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c1", null));
aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true));
SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap);
String expected = "DEE5E8F59833102F0FA5B10F8B8884EA15220D1D2A5F6097A93D8309132E1039";
String expected = "82E774457D26D0B8D481B6C39872070B25EA3C72C6EFC107B346FA42641740E1";
Assert.assertEquals(expected, fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0));
}
@ -60,25 +70,38 @@ public class FingerprintGeneratorTest
{
RowSignature rowSignature =
RowSignature.builder()
.add("c1", ColumnType.FLOAT)
.add("c2", ColumnType.LONG)
.add("c1", ColumnType.FLOAT)
.add("c3", ColumnType.DOUBLE)
.add("c0", ColumnType.STRING)
.build();
Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null));
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c2", null));
aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true));
SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap);
RowSignature rowSignaturePermutation =
RowSignature.builder()
.add("c2", ColumnType.LONG)
.add("c0", ColumnType.STRING)
.add("c3", ColumnType.DOUBLE)
.add("c1", ColumnType.FLOAT)
.build();
SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMap);
Assert.assertNotEquals(
Map<String, AggregatorFactory> aggregatorFactoryMapForPermutation = new HashMap<>();
aggregatorFactoryMapForPermutation.put(
"stringAny",
new StringAnyAggregatorFactory("stringAny", "c0", 1024, true)
);
aggregatorFactoryMapForPermutation.put(
"longFirst",
new LongFirstAggregatorFactory("longFirst", "c2", null)
);
SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMapForPermutation);
Assert.assertEquals(
fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0),
fingerprintGenerator.generateFingerprint(schemaPayloadNew, "ds", 0)
);
@ -125,4 +148,29 @@ public class FingerprintGeneratorTest
fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 1)
);
}
@Test
public void testRowSignatureIsSorted()
{
RowSignature rowSignature =
RowSignature.builder()
.add("c5", ColumnType.STRING)
.add("c1", ColumnType.FLOAT)
.add("b2", ColumnType.LONG)
.add("d3", ColumnType.DOUBLE)
.add("a1", ColumnType.STRING)
.build();
RowSignature sortedSignature = fingerprintGenerator.getLexicographicallySortedSignature(rowSignature);
Assert.assertNotEquals(rowSignature, sortedSignature);
List<String> columnNames = sortedSignature.getColumnNames();
List<String> sortedOrder = Arrays.asList("a1", "b2", "c1", "c5", "d3");
Assert.assertEquals(sortedOrder, columnNames);
for (String column : sortedOrder) {
Assert.assertEquals(sortedSignature.getColumnType(column), rowSignature.getColumnType(column));
}
}
}