SQL: Resolve column type conflicts in favor of newer segments. (#3930)

* SQL: Resolve column type conflicts in favor of newer segments.

Helps with schema evolution from e.g. long -> float, which is supported
on the query side.

* Take columns from highest timestamp instead of max segment id.

* Fixes and docs.
This commit is contained in:
Gian Merlino 2017-02-15 17:48:49 -08:00 committed by Jonathan Wei
parent 16ef513c7d
commit ca6053d045
3 changed files with 140 additions and 44 deletions

View File

@ -84,8 +84,8 @@ You can provide [connection context parameters](#connection-context) by adding a
### Metadata
Druid brokers cache column type metadata for each dataSource and use it to plan SQL queries. This cache is updated
on broker startup and also periodically in the background through
Druid brokers infer table and column metadata for each dataSource from segments loaded in the cluster, and use this to
plan SQL queries. This metadata is cached on broker startup and also updated periodically in the background through
[SegmentMetadata queries](../querying/segmentmetadataquery.html). Background metadata refreshing is triggered by
segments entering and exiting the cluster, and can also be throttled through configuration.

View File

@ -23,7 +23,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -34,6 +33,7 @@ import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.client.TimelineServerView;
import io.druid.common.utils.JodaUtils;
import io.druid.guice.ManageLifecycle;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.guava.Sequence;
@ -280,9 +280,9 @@ public class DruidSchema extends AbstractSchema
new TableDataSource(dataSource),
null,
null,
true,
null,
EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class),
false,
ImmutableMap.<String, Object>of("useCache", false, "populateCache", false),
EnumSet.of(SegmentMetadataQuery.AnalysisType.INTERVAL),
null,
true
);
@ -293,26 +293,47 @@ public class DruidSchema extends AbstractSchema
return null;
}
final Map<String, ColumnAnalysis> columnMetadata = Iterables.getOnlyElement(results).getColumns();
final Map<String, ValueType> columnTypes = Maps.newLinkedHashMap();
// Resolve conflicts by taking the latest metadata. This aids in gradual schema evolution.
long maxTimestamp = JodaUtils.MIN_INSTANT;
for (SegmentAnalysis analysis : results) {
final long timestamp;
if (analysis.getIntervals() != null && analysis.getIntervals().size() > 0) {
timestamp = analysis.getIntervals().get(analysis.getIntervals().size() - 1).getEndMillis();
} else {
timestamp = JodaUtils.MIN_INSTANT;
}
for (Map.Entry<String, ColumnAnalysis> entry : analysis.getColumns().entrySet()) {
if (entry.getValue().isError()) {
// Skip columns with analysis errors.
continue;
}
if (!columnTypes.containsKey(entry.getKey()) || timestamp >= maxTimestamp) {
ValueType valueType;
try {
valueType = ValueType.valueOf(entry.getValue().getType().toUpperCase());
}
catch (IllegalArgumentException e) {
// Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly
// what kind of complex column it is, which we may want to preserve some day.
valueType = ValueType.COMPLEX;
}
columnTypes.put(entry.getKey(), valueType);
maxTimestamp = timestamp;
}
}
}
final RowSignature.Builder rowSignature = RowSignature.builder();
for (Map.Entry<String, ColumnAnalysis> entry : columnMetadata.entrySet()) {
if (entry.getValue().isError()) {
// Ignore columns with metadata consistency errors.
continue;
}
ValueType valueType;
try {
valueType = ValueType.valueOf(entry.getValue().getType().toUpperCase());
}
catch (IllegalArgumentException e) {
// Assume unrecognized types are some flavor of COMPLEX. This throws away information about exactly
// what kind of complex column it is, which we may want to preserve some day.
valueType = ValueType.COMPLEX;
}
rowSignature.add(entry.getKey(), valueType);
for (Map.Entry<String, ValueType> entry : columnTypes.entrySet()) {
rowSignature.add(entry.getKey(), entry.getValue());
}
return new DruidTable(

View File

@ -19,19 +19,33 @@
package io.druid.sql.calcite.schema;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.data.input.InputRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.IndexBuilder;
import io.druid.segment.QueryableIndex;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.sql.calcite.planner.Calcites;
import io.druid.sql.calcite.planner.PlannerConfig;
import io.druid.sql.calcite.table.DruidTable;
import io.druid.sql.calcite.util.CalciteTests;
import io.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import io.druid.sql.calcite.util.TestServerInventoryView;
import org.apache.calcite.jdbc.CalciteConnection;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.type.SqlTypeName;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -39,34 +53,97 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.sql.Connection;
import java.sql.DriverManager;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class DruidSchemaTest
{
private static final PlannerConfig PLANNER_CONFIG_DEFAULT = new PlannerConfig();
public static final List<InputRow> ROWS1 = ImmutableList.of(
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-01", "m1", "1.0", "dim1", "")),
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-02", "m1", "2.0", "dim1", "10.1")),
CalciteTests.createRow(ImmutableMap.of("t", "2000-01-03", "m1", "3.0", "dim1", "2"))
);
public static final List<InputRow> ROWS2 = ImmutableList.of(
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-01", "m1", "4.0", "dim2", ImmutableList.of("a"))),
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-02", "m1", "5.0", "dim2", ImmutableList.of("abc"))),
CalciteTests.createRow(ImmutableMap.of("t", "2001-01-03", "m1", "6.0"))
);
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private SpecificSegmentsQuerySegmentWalker walker = null;
private DruidSchema schema = null;
private Connection connection = null;
@Before
public void setUp() throws Exception
{
Calcites.setSystemProperties();
walker = CalciteTests.createMockWalker(temporaryFolder.newFolder());
Properties props = new Properties();
props.setProperty("caseSensitive", "true");
props.setProperty("unquotedCasing", "UNCHANGED");
connection = DriverManager.getConnection("jdbc:calcite:", props);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
final File tmpDir = temporaryFolder.newFolder();
final QueryableIndex index1 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
.indexMerger(TestHelper.getTestIndexMergerV9())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
}
)
.withRollup(false)
.build()
)
.rows(ROWS1)
.buildMMappedIndex();
final QueryableIndex index2 = IndexBuilder.create()
.tmpDir(new File(tmpDir, "2"))
.indexMerger(TestHelper.getTestIndexMergerV9())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new AggregatorFactory[]{
new LongSumAggregatorFactory("m1", "m1")
}
)
.withRollup(false)
.build()
)
.rows(ROWS2)
.buildMMappedIndex();
walker = new SpecificSegmentsQuerySegmentWalker(CalciteTests.queryRunnerFactoryConglomerate()).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1)
.interval(new Interval("2000/P1Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
index1
).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE1)
.interval(new Interval("2001/P1Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
index2
).add(
DataSegment.builder()
.dataSource(CalciteTests.DATASOURCE2)
.interval(index2.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(0))
.build(),
index2
);
schema = new DruidSchema(
walker,
@ -74,7 +151,6 @@ public class DruidSchemaTest
PLANNER_CONFIG_DEFAULT
);
calciteConnection.getRootSchema().add("s", schema);
schema.start();
schema.awaitInitialization();
}
@ -84,7 +160,6 @@ public class DruidSchemaTest
{
schema.stop();
walker.close();
connection.close();
}
@Test
@ -110,13 +185,13 @@ public class DruidSchemaTest
Assert.assertEquals("dim1", fields.get(2).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(2).getType().getSqlTypeName());
Assert.assertEquals("dim2", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(3).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(3).getName());
Assert.assertEquals(SqlTypeName.BIGINT, fields.get(3).getType().getSqlTypeName());
Assert.assertEquals("m1", fields.get(4).getName());
Assert.assertEquals(SqlTypeName.FLOAT, fields.get(4).getType().getSqlTypeName());
Assert.assertEquals("unique_dim1", fields.get(4).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(4).getType().getSqlTypeName());
Assert.assertEquals("unique_dim1", fields.get(5).getName());
Assert.assertEquals(SqlTypeName.OTHER, fields.get(5).getType().getSqlTypeName());
Assert.assertEquals("dim2", fields.get(5).getName());
Assert.assertEquals(SqlTypeName.VARCHAR, fields.get(5).getType().getSqlTypeName());
}
}