diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 19761ad156e..1ff33af5254 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -84,8 +84,8 @@ You can provide [connection context parameters](#connection-context) by adding a ### Metadata -Druid brokers cache column type metadata for each dataSource and use it to plan SQL queries. This cache is updated -on broker startup and also periodically in the background through +Druid brokers infer table and column metadata for each dataSource from segments loaded in the cluster, and use this to +plan SQL queries. This metadata is cached on broker startup and also updated periodically in the background through [SegmentMetadata queries](../querying/segmentmetadataquery.html). Background metadata refreshing is triggered by segments entering and exiting the cluster, and can also be throttled through configuration. diff --git a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java index a22d58c20b3..8d318341226 100644 --- a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java @@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -34,6 +33,7 @@ import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.ServerView; import io.druid.client.TimelineServerView; +import io.druid.common.utils.JodaUtils; import io.druid.guice.ManageLifecycle; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.guava.Sequence; @@ -280,9 +280,9 @@ public class DruidSchema extends AbstractSchema new TableDataSource(dataSource), null, null, - true, - null, - EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, + ImmutableMap.of("useCache", false, "populateCache", false), + EnumSet.of(SegmentMetadataQuery.AnalysisType.INTERVAL), null, true ); @@ -293,26 +293,47 @@ public class DruidSchema extends AbstractSchema return null; } - final Map columnMetadata = Iterables.getOnlyElement(results).getColumns(); + final Map columnTypes = Maps.newLinkedHashMap(); + + // Resolve conflicts by taking the latest metadata. This aids in gradual schema evolution. + long maxTimestamp = JodaUtils.MIN_INSTANT; + + for (SegmentAnalysis analysis : results) { + final long timestamp; + + if (analysis.getIntervals() != null && analysis.getIntervals().size() > 0) { + timestamp = analysis.getIntervals().get(analysis.getIntervals().size() - 1).getEndMillis(); + } else { + timestamp = JodaUtils.MIN_INSTANT; + } + + for (Map.Entry entry : analysis.getColumns().entrySet()) { + if (entry.getValue().isError()) { + // Skip columns with analysis errors. + continue; + } + + if (!columnTypes.containsKey(entry.getKey()) || timestamp >= maxTimestamp) { + ValueType valueType; + try { + valueType = ValueType.valueOf(entry.getValue().getType().toUpperCase()); + } + catch (IllegalArgumentException e) { + // Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly + // what kind of complex column it is, which we may want to preserve some day. + valueType = ValueType.COMPLEX; + } + + columnTypes.put(entry.getKey(), valueType); + + maxTimestamp = timestamp; + } + } + } + final RowSignature.Builder rowSignature = RowSignature.builder(); - - for (Map.Entry entry : columnMetadata.entrySet()) { - if (entry.getValue().isError()) { - // Ignore columns with metadata consistency errors. - continue; - } - - ValueType valueType; - try { - valueType = ValueType.valueOf(entry.getValue().getType().toUpperCase()); - } - catch (IllegalArgumentException e) { - // Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly - // what kind of complex column it is, which we may want to preserve some day. - valueType = ValueType.COMPLEX; - } - - rowSignature.add(entry.getKey(), valueType); + for (Map.Entry entry : columnTypes.entrySet()) { + rowSignature.add(entry.getKey(), entry.getValue()); } return new DruidTable( diff --git a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java index 093b0c7d309..489c60bedfd 100644 --- a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java @@ -19,19 +19,33 @@ package io.druid.sql.calcite.schema; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import io.druid.data.input.InputRow; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.IndexBuilder; +import io.druid.segment.QueryableIndex; +import io.druid.segment.TestHelper; +import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.table.DruidTable; import io.druid.sql.calcite.util.CalciteTests; import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker; import io.druid.sql.calcite.util.TestServerInventoryView; -import org.apache.calcite.jdbc.CalciteConnection; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; import org.apache.calcite.jdbc.JavaTypeFactoryImpl; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; +import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -39,34 +53,97 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import java.sql.Connection; -import java.sql.DriverManager; +import java.io.File; import java.util.List; import java.util.Map; -import java.util.Properties; public class DruidSchemaTest { private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig(); + public static final List ROWS1 = ImmutableList.of( + CalciteTests.createRow(ImmutableMap.of("t", "2000-01-01", "m1", "1.0", "dim1", "")), + CalciteTests.createRow(ImmutableMap.of("t", "2000-01-02", "m1", "2.0", "dim1", "10.1")), + CalciteTests.createRow(ImmutableMap.of("t", "2000-01-03", "m1", "3.0", "dim1", "2")) + ); + + public static final List ROWS2 = ImmutableList.of( + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "4.0", "dim2", ImmutableList.of("a"))), + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "5.0", "dim2", ImmutableList.of("abc"))), + CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0")) + ); + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); private SpecificSegmentsQuerySegmentWalker walker = null; private DruidSchema schema = null; - private Connection connection = null; @Before public void setUp() throws Exception { Calcites.setSystemProperties(); - walker = CalciteTests.createMockWalker(temporaryFolder.newFolder()); - Properties props = new Properties(); - props.setProperty("caseSensitive", "true"); - props.setProperty("unquotedCasing", "UNCHANGED"); - connection = DriverManager.getConnection("jdbc:calcite:", props); - CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class); + final File tmpDir = temporaryFolder.newFolder(); + final QueryableIndex index1 = IndexBuilder.create() + .tmpDir(new File(tmpDir, "1")) + .indexMerger(TestHelper.getTestIndexMergerV9()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new AggregatorFactory[]{ + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") + } + ) + .withRollup(false) + .build() + ) + .rows(ROWS1) + .buildMMappedIndex(); + + final QueryableIndex index2 = IndexBuilder.create() + .tmpDir(new File(tmpDir, "2")) + .indexMerger(TestHelper.getTestIndexMergerV9()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new AggregatorFactory[]{ + new LongSumAggregatorFactory("m1", "m1") + } + ) + .withRollup(false) + .build() + ) + .rows(ROWS2) + .buildMMappedIndex(); + + walker = new SpecificSegmentsQuerySegmentWalker(CalciteTests.queryRunnerFactoryConglomerate()).add( + DataSegment.builder() + .dataSource(CalciteTests.DATASOURCE1) + .interval(new Interval("2000/P1Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .build(), + index1 + ).add( + DataSegment.builder() + .dataSource(CalciteTests.DATASOURCE1) + .interval(new Interval("2001/P1Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .build(), + index2 + ).add( + DataSegment.builder() + .dataSource(CalciteTests.DATASOURCE2) + .interval(index2.getDataInterval()) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .build(), + index2 + ); schema = new DruidSchema( walker, @@ -74,7 +151,6 @@ public class DruidSchemaTest PLANNER_CONFIG_DEFAULT ); - calciteConnection.getRootSchema().add("s", schema); schema.start(); schema.awaitInitialization(); } @@ -84,7 +160,6 @@ public class DruidSchemaTest { schema.stop(); walker.close(); - connection.close(); } @Test @@ -110,13 +185,13 @@ public class DruidSchemaTest Assert.assertEquals("dim1", fields.get(2).getName()); Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(2).getType().getSqlTypeName()); - Assert.assertEquals("dim2", fields.get(3).getName()); - Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName()); + Assert.assertEquals("m1", fields.get(3).getName()); + Assert.assertEquals(SqlTypeName.BIGINT, fields.get(3).getType().getSqlTypeName()); - Assert.assertEquals("m1", fields.get(4).getName()); - Assert.assertEquals(SqlTypeName.FLOAT, fields.get(4).getType().getSqlTypeName()); + Assert.assertEquals("unique_dim1", fields.get(4).getName()); + Assert.assertEquals(SqlTypeName.OTHER, fields.get(4).getType().getSqlTypeName()); - Assert.assertEquals("unique_dim1", fields.get(5).getName()); - Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName()); + Assert.assertEquals("dim2", fields.get(5).getName()); + Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(5).getType().getSqlTypeName()); } }