diff --git a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java index f1cb8ea0a50..7d35b180c62 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; @@ -30,11 +31,17 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** - * Utility to generate fingerprint for an object. + * Utility to generate schema fingerprint which is used to ensure schema uniqueness in the metadata database. + * Note, that the generated fingerprint is independent of the column order. */ @LazySingleton public class FingerprintGenerator @@ -53,12 +60,20 @@ public class FingerprintGenerator * Generates fingerprint or hash string for an object using SHA-256 hash algorithm. */ @SuppressWarnings("UnstableApiUsage") - public String generateFingerprint(SchemaPayload schemaPayload, String dataSource, int version) + public String generateFingerprint(final SchemaPayload schemaPayload, final String dataSource, final int version) { + // Sort the column names in lexicographic order + // The aggregator factories are column order independent since they are stored in a hashmap + // This ensures that all permutations of a given columns would result in the same fingerprint + // thus avoiding schema explosion in the metadata database + // Note that this signature is not persisted anywhere, it is only used for fingerprint computation + final RowSignature sortedSignature = getLexicographicallySortedSignature(schemaPayload.getRowSignature()); + final SchemaPayload updatedPayload = new SchemaPayload(sortedSignature, schemaPayload.getAggregatorFactories()); try { + final Hasher hasher = Hashing.sha256().newHasher(); - hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload)); + hasher.putBytes(objectMapper.writeValueAsBytes(updatedPayload)); // add delimiter, inspired from org.apache.druid.metadata.PendingSegmentRecord.computeSequenceNamePrevIdSha1 hasher.putByte((byte) 0xff); @@ -82,4 +97,21 @@ public class FingerprintGenerator ); } } + + @VisibleForTesting + protected RowSignature getLexicographicallySortedSignature(final RowSignature rowSignature) + { + final List columns = new ArrayList<>(rowSignature.getColumnNames()); + + Collections.sort(columns); + + final RowSignature.Builder sortedSignature = RowSignature.builder(); + + for (String column : columns) { + ColumnType type = rowSignature.getColumnType(column).orElse(null); + sortedSignature.add(column, type); + } + + return sortedSignature.build(); + } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java index 143f8917d78..fc99af76321 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.metadata; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -48,6 +49,7 @@ import org.skife.jdbi.v2.Handle; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -281,6 +283,112 @@ public class IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest extends segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap); } + @Test + public void testSchemaPermutation() throws JsonProcessingException + { + Set segments = new HashSet<>(); + SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION); + // Store the first observed column order for each segment for verification purpose + Map> segmentIdSchemaMap = new HashMap<>(); + + RowSignature originalOrder = + RowSignature.builder() + .add("d7", ColumnType.LONG_ARRAY) + .add("b1", ColumnType.FLOAT) + .add("a5", ColumnType.DOUBLE) + .build(); + + // column permutations + List> permutations = Arrays.asList( + Arrays.asList("d7", "a5", "b1"), + Arrays.asList("a5", "b1", "d7"), + Arrays.asList("a5", "d7", "b1"), + Arrays.asList("b1", "d7", "a5"), + Arrays.asList("b1", "a5", "d7"), + Arrays.asList("d7", "a5", "b1") + ); + + boolean first = true; + + Random random = ThreadLocalRandom.current(); + Random permutationRandom = ThreadLocalRandom.current(); + + for (int i = 0; i < 105; i++) { + DataSegment segment = new DataSegment( + "fooDataSource", + Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(i), + 9, + 100 + ); + segments.add(segment); + + int randomNum = random.nextInt(); + + RowSignature rowSignature; + + if (first) { + rowSignature = originalOrder; + } else { + RowSignature.Builder builder = RowSignature.builder(); + List columns = permutations.get(permutationRandom.nextInt(permutations.size())); + + for (String column : columns) { + builder.add(column, originalOrder.getColumnType(column).get()); + } + + rowSignature = builder.build(); + } + + SchemaPayload schemaPayload = new SchemaPayload(rowSignature); + segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new SchemaPayload(originalOrder), randomNum)); + segmentSchemaMapping.addSchema( + segment.getId(), + new SchemaPayloadPlus(schemaPayload, (long) randomNum), + fingerprintGenerator.generateFingerprint( + schemaPayload, + segment.getDataSource(), + CentralizedDatasourceSchemaConfig.SCHEMA_VERSION + ) + ); + + if (first) { + coordinator.commitSegments(segments, segmentSchemaMapping); + first = false; + } + } + + coordinator.commitSegments(segments, segmentSchemaMapping); + for (DataSegment segment : segments) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getId().toString() + ) + ); + } + + List segmentIds = segments.stream() + .map(segment -> segment.getId().toString()) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toList()); + + Assert.assertEquals(segmentIds, retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get())); + + // Should not update dataSource metadata. + Assert.assertEquals(0, metadataUpdateCounter.get()); + + // verify that only a single schema is created + segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap); + } + @Test public void testAnnounceHistoricalSegments_schemaExists() throws IOException { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java index 09358550802..c0100ffe151 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java @@ -22,6 +22,7 @@ package org.apache.druid.segment.metadata; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory; import org.apache.druid.segment.SchemaPayload; import org.apache.druid.segment.TestHelper; @@ -30,7 +31,9 @@ import org.apache.druid.segment.column.RowSignature; import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; public class FingerprintGeneratorTest @@ -45,13 +48,20 @@ public class FingerprintGeneratorTest @Test public void testGenerateFingerprint_precalculatedHash() { - RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build(); + RowSignature rowSignature = + RowSignature.builder() + .add("c1", ColumnType.LONG) + .add("c0", ColumnType.STRING) + .add("c2", ColumnType.FLOAT) + .add("c3", ColumnType.DOUBLE) + .build(); Map aggregatorFactoryMap = new HashMap<>(); - aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null)); + aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c1", null)); + aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true)); SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap); - String expected = "DEE5E8F59833102F0FA5B10F8B8884EA15220D1D2A5F6097A93D8309132E1039"; + String expected = "82E774457D26D0B8D481B6C39872070B25EA3C72C6EFC107B346FA42641740E1"; Assert.assertEquals(expected, fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0)); } @@ -60,25 +70,38 @@ public class FingerprintGeneratorTest { RowSignature rowSignature = RowSignature.builder() - .add("c1", ColumnType.FLOAT) .add("c2", ColumnType.LONG) + .add("c1", ColumnType.FLOAT) .add("c3", ColumnType.DOUBLE) + .add("c0", ColumnType.STRING) .build(); Map aggregatorFactoryMap = new HashMap<>(); - aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null)); + aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c2", null)); + aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true)); SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap); RowSignature rowSignaturePermutation = RowSignature.builder() .add("c2", ColumnType.LONG) + .add("c0", ColumnType.STRING) .add("c3", ColumnType.DOUBLE) .add("c1", ColumnType.FLOAT) .build(); - SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMap); - Assert.assertNotEquals( + Map aggregatorFactoryMapForPermutation = new HashMap<>(); + aggregatorFactoryMapForPermutation.put( + "stringAny", + new StringAnyAggregatorFactory("stringAny", "c0", 1024, true) + ); + aggregatorFactoryMapForPermutation.put( + "longFirst", + new LongFirstAggregatorFactory("longFirst", "c2", null) + ); + + SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMapForPermutation); + Assert.assertEquals( fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0), fingerprintGenerator.generateFingerprint(schemaPayloadNew, "ds", 0) ); @@ -125,4 +148,29 @@ public class FingerprintGeneratorTest fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 1) ); } + + @Test + public void testRowSignatureIsSorted() + { + RowSignature rowSignature = + RowSignature.builder() + .add("c5", ColumnType.STRING) + .add("c1", ColumnType.FLOAT) + .add("b2", ColumnType.LONG) + .add("d3", ColumnType.DOUBLE) + .add("a1", ColumnType.STRING) + .build(); + + RowSignature sortedSignature = fingerprintGenerator.getLexicographicallySortedSignature(rowSignature); + + Assert.assertNotEquals(rowSignature, sortedSignature); + + List columnNames = sortedSignature.getColumnNames(); + List sortedOrder = Arrays.asList("a1", "b2", "c1", "c5", "d3"); + Assert.assertEquals(sortedOrder, columnNames); + + for (String column : sortedOrder) { + Assert.assertEquals(sortedSignature.getColumnType(column), rowSignature.getColumnType(column)); + } + } }